From 2d2f7828c9f16691a1fc4b626d2f0e98bc98fdec Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Thu, 9 Apr 2026 15:56:06 -0700 Subject: [PATCH] Fix persistence --- apps/sim/app/api/admin/mothership/route.ts | 130 +++ apps/sim/app/api/copilot/chat/stream/route.ts | 7 +- .../api/mothership/chats/[chatId]/route.ts | 5 +- .../[workspaceId]/home/hooks/use-chat.ts | 7 +- .../settings/[section]/settings.tsx | 12 +- .../components/mothership/mothership.tsx | 908 ++++++++++++++++++ .../[workspaceId]/settings/navigation.ts | 8 + apps/sim/hooks/queries/mothership-admin.ts | 131 +++ apps/sim/hooks/queries/tasks.ts | 3 +- .../lib/copilot/chat/terminal-state.test.ts | 102 ++ .../copilot/request/lifecycle/start.test.ts | 8 +- .../lib/copilot/request/lifecycle/start.ts | 2 + .../copilot/request/session/buffer.test.ts | 72 +- .../sim/lib/copilot/request/session/buffer.ts | 31 +- apps/sim/lib/copilot/request/session/index.ts | 2 + apps/sim/lib/copilot/request/session/types.ts | 15 + apps/sim/lib/core/config/env.ts | 6 + 17 files changed, 1427 insertions(+), 22 deletions(-) create mode 100644 apps/sim/app/api/admin/mothership/route.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/settings/components/mothership/mothership.tsx create mode 100644 apps/sim/hooks/queries/mothership-admin.ts create mode 100644 apps/sim/lib/copilot/chat/terminal-state.test.ts diff --git a/apps/sim/app/api/admin/mothership/route.ts b/apps/sim/app/api/admin/mothership/route.ts new file mode 100644 index 0000000000..3d3d722151 --- /dev/null +++ b/apps/sim/app/api/admin/mothership/route.ts @@ -0,0 +1,130 @@ +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { env } from '@/lib/core/config/env' + +const ENV_URLS: Record = { + dev: env.MOTHERSHIP_DEV_URL, + staging: env.MOTHERSHIP_STAGING_URL, + prod: env.MOTHERSHIP_PROD_URL, +} + +function getMothershipUrl(environment: string): string | null { + return ENV_URLS[environment] ?? null +} + +/** + * Proxy to the mothership admin API. + * + * Query params: + * env - "dev" | "staging" | "prod" + * endpoint - the admin endpoint path, e.g. "requests", "licenses", "traces" + * + * The request body (for POST) is forwarded as-is. Additional query params + * (e.g. requestId for GET /traces) are forwarded. + */ +export async function POST(req: NextRequest) { + const session = await getSession() + if (!session?.user || session.user.role !== 'admin') { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const adminKey = env.MOTHERSHIP_API_ADMIN_KEY + if (!adminKey) { + return NextResponse.json({ error: 'MOTHERSHIP_API_ADMIN_KEY not configured' }, { status: 500 }) + } + + const { searchParams } = new URL(req.url) + const environment = searchParams.get('env') || 'dev' + const endpoint = searchParams.get('endpoint') + + if (!endpoint) { + return NextResponse.json({ error: 'endpoint query param required' }, { status: 400 }) + } + + const baseUrl = getMothershipUrl(environment) + if (!baseUrl) { + return NextResponse.json( + { error: `No URL configured for environment: ${environment}` }, + { status: 400 } + ) + } + + const targetUrl = `${baseUrl}/api/admin/${endpoint}` + + try { + const body = await req.text() + const upstream = await fetch(targetUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-api-key': adminKey, + }, + ...(body ? { body } : {}), + }) + + const data = await upstream.json() + return NextResponse.json(data, { status: upstream.status }) + } catch (error) { + return NextResponse.json( + { + error: `Failed to reach mothership (${environment}): ${error instanceof Error ? error.message : 'Unknown error'}`, + }, + { status: 502 } + ) + } +} + +export async function GET(req: NextRequest) { + const session = await getSession() + if (!session?.user || session.user.role !== 'admin') { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const adminKey = env.MOTHERSHIP_API_ADMIN_KEY + if (!adminKey) { + return NextResponse.json({ error: 'MOTHERSHIP_API_ADMIN_KEY not configured' }, { status: 500 }) + } + + const { searchParams } = new URL(req.url) + const environment = searchParams.get('env') || 'dev' + const endpoint = searchParams.get('endpoint') + + if (!endpoint) { + return NextResponse.json({ error: 'endpoint query param required' }, { status: 400 }) + } + + const baseUrl = getMothershipUrl(environment) + if (!baseUrl) { + return NextResponse.json( + { error: `No URL configured for environment: ${environment}` }, + { status: 400 } + ) + } + + const forwardParams = new URLSearchParams() + searchParams.forEach((value, key) => { + if (key !== 'env' && key !== 'endpoint') { + forwardParams.set(key, value) + } + }) + + const qs = forwardParams.toString() + const targetUrl = `${baseUrl}/api/admin/${endpoint}${qs ? `?${qs}` : ''}` + + try { + const upstream = await fetch(targetUrl, { + method: 'GET', + headers: { 'x-api-key': adminKey }, + }) + + const data = await upstream.json() + return NextResponse.json(data, { status: upstream.status }) + } catch (error) { + return NextResponse.json( + { + error: `Failed to reach mothership (${environment}): ${error instanceof Error ? error.message : 'Unknown error'}`, + }, + { status: 502 } + ) + } +} diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index 9ccf07b399..9f1323a215 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -13,6 +13,7 @@ import { readEvents, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/request/session' +import { toStreamBatchEvent } from '@/lib/copilot/request/session/types' export const maxDuration = 3600 @@ -113,11 +114,7 @@ export async function GET(request: NextRequest) { if (batchMode) { const afterSeq = afterCursor || '0' const events = await readEvents(streamId, afterSeq) - const batchEvents = events.map((envelope) => ({ - eventId: envelope.seq, - streamId: envelope.stream.streamId, - event: envelope, - })) + const batchEvents = events.map(toStreamBatchEvent) logger.info('[Resume] Batch response', { streamId, afterCursor: afterSeq, diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 5800507f9f..39c1a7465d 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -12,6 +12,7 @@ import { createUnauthorizedResponse, } from '@/lib/copilot/request/http' import { readEvents } from '@/lib/copilot/request/session/buffer' +import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types' import { taskPubSub } from '@/lib/copilot/tasks' import { captureServerEvent } from '@/lib/posthog/server' @@ -47,7 +48,7 @@ export async function GET( } let streamSnapshot: { - events: unknown[] + events: StreamBatchEvent[] status: string } | null = null @@ -56,7 +57,7 @@ export async function GET( const events = await readEvents(chat.conversationId, '0') streamSnapshot = { - events: events || [], + events: events.map(toStreamBatchEvent), status: events.length > 0 ? 'active' : 'unknown', } } catch (error) { diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 58b46cd066..2fa506bb0c 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -35,6 +35,7 @@ import { ToolSearchToolRegex, WorkspaceFile, } from '@/lib/copilot/generated/tool-catalog-v1' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' import { extractResourcesFromToolResult, isResourceToolName, @@ -148,12 +149,6 @@ type StreamingFilePreview = { content: string } -type StreamBatchEvent = { - eventId: number - streamId: string - event: Record -} - type StreamBatchResponse = { success: boolean events: StreamBatchEvent[] diff --git a/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx b/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx index ddda31f2a4..aa6b9bf34b 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/[section]/settings.tsx @@ -142,6 +142,13 @@ const Admin = dynamic( import('@/app/workspace/[workspaceId]/settings/components/admin/admin').then((m) => m.Admin), { loading: () => } ) +const Mothership = dynamic( + () => + import('@/app/workspace/[workspaceId]/settings/components/mothership/mothership').then( + (m) => m.Mothership + ), + { loading: () => } +) const RecentlyDeleted = dynamic( () => import( @@ -175,7 +182,9 @@ export function SettingsPage({ section }: SettingsPageProps) { ? 'general' : section === 'admin' && !sessionLoading && !isAdminRole ? 'general' - : section + : section === 'mothership' && !sessionLoading && !isAdminRole + ? 'general' + : section const label = allNavigationItems.find((item) => item.id === effectiveSection)?.label ?? effectiveSection @@ -207,6 +216,7 @@ export function SettingsPage({ section }: SettingsPageProps) { {effectiveSection === 'inbox' && } {effectiveSection === 'recently-deleted' && } {effectiveSection === 'admin' && } + {effectiveSection === 'mothership' && } ) } diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/mothership/mothership.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/mothership/mothership.tsx new file mode 100644 index 0000000000..aa96075b40 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/mothership/mothership.tsx @@ -0,0 +1,908 @@ +'use client' + +import { useCallback, useMemo, useState } from 'react' +import { Badge, Button, Input as EmcnInput, Label, Skeleton } from '@/components/emcn' +import { cn } from '@/lib/core/utils/cn' +import { + type MothershipEnv, + useGenerateLicense, + useMothershipEnterpriseStats, + useMothershipLicenses, + useMothershipRequests, + useMothershipTrace, + useMothershipUserBreakdown, +} from '@/hooks/queries/mothership-admin' + +type Tab = 'overview' | 'licenses' | 'enterprise' | 'traces' + +const TABS: { id: Tab; label: string }[] = [ + { id: 'overview', label: 'Overview' }, + { id: 'licenses', label: 'Licenses' }, + { id: 'enterprise', label: 'Enterprise' }, + { id: 'traces', label: 'Traces' }, +] + +const ENV_OPTIONS: { id: MothershipEnv; label: string }[] = [ + { id: 'dev', label: 'Dev' }, + { id: 'staging', label: 'Staging' }, + { id: 'prod', label: 'Prod' }, +] + +function defaultTimeRange() { + const end = new Date() + const start = new Date() + start.setDate(start.getDate() - 7) + return { + start: start.toISOString().slice(0, 16), + end: end.toISOString().slice(0, 16), + } +} + +function toRFC3339(local: string) { + if (!local) return '' + return new Date(local).toISOString() +} + +function formatCost(cost: number) { + return `$${cost.toFixed(4)}` +} + +function formatDate(d: string | null | undefined) { + if (!d) return '—' + return new Date(d).toLocaleString() +} + +function Divider() { + return
+} + +function SectionLabel({ children }: { children: React.ReactNode }) { + return

{children}

+} + +export function Mothership() { + const [activeTab, setActiveTab] = useState('overview') + const [environment, setEnvironment] = useState('dev') + const defaults = useMemo(() => defaultTimeRange(), []) + const [start, setStart] = useState(defaults.start) + const [end, setEnd] = useState(defaults.end) + + return ( +
+ {/* Environment selector */} +
+ +
+ {ENV_OPTIONS.map((opt) => ( + + ))} +
+
+ + {/* Tab bar */} +
+ {TABS.map((tab) => ( + + ))} +
+ + {/* Time range (shared across tabs) */} +
+
+ + setStart(e.target.value)} + className='h-[30px] text-caption' + /> +
+
+ + setEnd(e.target.value)} + className='h-[30px] text-caption' + /> +
+
+ + + + {activeTab === 'overview' && ( + + )} + {activeTab === 'licenses' && } + {activeTab === 'enterprise' && ( + + )} + {activeTab === 'traces' && } +
+ ) +} + +/* ─── Overview Tab ─── */ + +function OverviewTab({ + environment, + start, + end, +}: { + environment: MothershipEnv + start: string + end: string +}) { + const { data: breakdown, isLoading: breakdownLoading } = useMothershipUserBreakdown( + environment, + start, + end + ) + const { data: requests, isLoading: requestsLoading } = useMothershipRequests( + environment, + start, + end + ) + + return ( +
+ {/* Summary cards */} +
+ + + s + u.total_cost, + 0 + ) + ) + : undefined + } + loading={breakdownLoading} + /> + s + u.total_cost, + 0 + ) / breakdown.total_requests + ) + : undefined + } + loading={breakdownLoading} + /> +
+ + {/* User breakdown */} + User Breakdown + {breakdownLoading && ( +
+ {Array.from({ length: 5 }).map((_, i) => ( + + ))} +
+ )} + {breakdown?.users && ( +
+
+ User ID + Requests + Cost + Last Request +
+ {breakdown.users.map( + (u: { + user_id: string + request_count: number + total_cost: number + last_request: string + }) => ( +
+ + {u.user_id} + + + {u.request_count} + + + {formatCost(u.total_cost)} + + + {formatDate(u.last_request)} + +
+ ) + )} +
+ )} + + {/* Recent requests */} + + Recent Requests ({requests?.count ?? '…'}) + {requestsLoading && ( +
+ {Array.from({ length: 5 }).map((_, i) => ( + + ))} +
+ )} + {requests?.requests && ( +
+
+
+ Request ID + Model + Duration + Cost + Tools + Status + Time +
+ {requests.requests + .slice(0, 100) + .map( + (r: { + request_id: string + model: string + duration_ms: number + billed_total_cost: number + tool_call_count: number + error: boolean + aborted: boolean + created_at: string + }) => ( +
+ + {r.request_id ?? '—'} + + + {(r.model ?? '').replace('claude-', '')} + + + {r.duration_ms ? `${(r.duration_ms / 1000).toFixed(1)}s` : '—'} + + + {formatCost(r.billed_total_cost ?? 0)} + + + {r.tool_call_count ?? 0} + + + {r.error ? ( + Error + ) : r.aborted ? ( + Abort + ) : ( + OK + )} + + + {formatDate(r.created_at)} + +
+ ) + )} +
+
+ )} +
+ ) +} + +/* ─── Licenses Tab ─── */ + +function LicensesTab({ environment }: { environment: MothershipEnv }) { + const { data, isLoading, refetch } = useMothershipLicenses(environment) + const generateLicense = useGenerateLicense(environment) + const [newName, setNewName] = useState('') + const [newExpiry, setNewExpiry] = useState('') + const [generatedKey, setGeneratedKey] = useState(null) + + const handleGenerate = useCallback(() => { + if (!newName.trim()) return + generateLicense.mutate( + { + name: newName.trim(), + ...(newExpiry ? { expirationDate: newExpiry } : {}), + }, + { + onSuccess: (result) => { + setGeneratedKey(result.license_key) + setNewName('') + setNewExpiry('') + refetch() + }, + } + ) + }, [newName, newExpiry, generateLicense, refetch]) + + return ( +
+ Generate License +
+
+ + { + setNewName(e.target.value) + setGeneratedKey(null) + }} + placeholder='e.g. Acme Corp' + className='h-[32px] w-[200px]' + /> +
+
+ + setNewExpiry(e.target.value)} + className='h-[32px] w-[160px]' + /> +
+ +
+ + {generatedKey && ( +
+

+ License key (only shown once): +

+ + {generatedKey} + +
+ )} + + {generateLicense.error && ( +

{generateLicense.error.message}

+ )} + + + All Licenses + + {isLoading && ( +
+ {Array.from({ length: 3 }).map((_, i) => ( + + ))} +
+ )} + + {data?.licenses && ( +
+
+ Name + Validations + Expiration + Created +
+ {data.licenses.length === 0 && ( +
+ No licenses found. +
+ )} + {data.licenses.map( + (lic: { + id: string + name: string + count: number + expiration_date?: string + created_at: string + }) => ( +
+ {lic.name} + + {lic.count} + + + {lic.expiration_date ? formatDate(lic.expiration_date) : 'Never'} + + + {formatDate(lic.created_at)} + +
+ ) + )} +
+ )} +
+ ) +} + +/* ─── Enterprise Tab ─── */ + +function EnterpriseTab({ + environment, + start, + end, +}: { + environment: MothershipEnv + start: string + end: string +}) { + const [customerType, setCustomerType] = useState('') + const [searchInput, setSearchInput] = useState('') + const { data, isLoading, error } = useMothershipEnterpriseStats( + environment, + customerType, + start, + end + ) + + const handleSearch = () => { + setCustomerType(searchInput.trim()) + } + + return ( +
+
+ setSearchInput(e.target.value)} + onKeyDown={(e) => e.key === 'Enter' && handleSearch()} + placeholder='Enter customer type (e.g. enterprise name)...' + /> + +
+ + {error &&

{error.message}

} + + {isLoading && customerType && ( +
+ {Array.from({ length: 4 }).map((_, i) => ( + + ))} +
+ )} + + {data && ( + <> +
+ + + + +
+ + {data.top_models && ( + <> + + Top Models +
+ {data.top_models.map((m: { model: string; count: number }) => ( + + {m.model} ({m.count}) + + ))} +
+ + )} + + {data.users && ( + <> + + User Breakdown +
+
+ User ID + Requests + Cost + Last Request +
+ {data.users.map( + (u: { + user_id: string + request_count: number + total_cost: number + last_request: string + }) => ( +
+ + {u.user_id} + + + {u.request_count} + + + {formatCost(u.total_cost)} + + + {formatDate(u.last_request)} + +
+ ) + )} +
+ + )} + + )} +
+ ) +} + +/* ─── Traces Tab ─── */ + +function TracesTab({ environment }: { environment: MothershipEnv }) { + const [requestIdInput, setRequestIdInput] = useState('') + const [activeRequestId, setActiveRequestId] = useState('') + const { data: trace, isLoading, error } = useMothershipTrace(environment, activeRequestId) + + const handleLookup = () => { + setActiveRequestId(requestIdInput.trim()) + } + + return ( +
+
+ setRequestIdInput(e.target.value)} + onKeyDown={(e) => e.key === 'Enter' && handleLookup()} + placeholder='Paste a request ID (go_trace_id)...' + className='font-mono text-[13px]' + /> + +
+ + {error &&

{error.message}

} + + {isLoading && activeRequestId && ( +
+ {Array.from({ length: 4 }).map((_, i) => ( + + ))} +
+ )} + + {trace && } +
+ ) +} + +/* ─── Trace Detail ─── */ + +interface TraceSpan { + name: string + kind?: string + startMs: number + endMs?: number + durationMs?: number + status: string + parentName?: string + source?: string + attributes?: Record +} + +interface TraceData { + id: string + simRequestId: string + goTraceId: string + streamId?: string + chatId?: string + userId?: string + startMs: number + endMs: number + durationMs: number + outcome: string + spans: TraceSpan[] + model?: string + provider?: string + mode?: string + source?: string + message?: string + inputTokens?: number + outputTokens?: number + cacheReadTokens?: number + cacheWriteTokens?: number + rawTotalCost?: number + billedTotalCost?: number + toolCallCount?: number + error?: boolean + aborted?: boolean + errorMsg?: string +} + +function TraceDetail({ trace }: { trace: TraceData }) { + const rootSpans = trace.spans.filter((s) => !s.parentName) + const childMap = new Map() + for (const span of trace.spans) { + if (span.parentName) { + const existing = childMap.get(span.parentName) || [] + existing.push(span) + childMap.set(span.parentName, existing) + } + } + + return ( +
+ {/* Trace metadata */} +
+ + + + + {trace.outcome} + + + + + + + + {trace.userId && } + {trace.chatId && } + + + {trace.toolCallCount != null && trace.toolCallCount > 0 && ( + + )} + {trace.message && ( +
+ +
+ )} + {trace.errorMsg && ( +
+ + {trace.errorMsg} + +
+ )} +
+ + {/* Span tree */} + Spans ({trace.spans.length}) +
+ {rootSpans + .sort((a, b) => a.startMs - b.startMs) + .map((span) => ( + + ))} +
+
+ ) +} + +function SpanNode({ + span, + childMap, + traceStartMs, + traceDurationMs, + depth, +}: { + span: TraceSpan + childMap: Map + traceStartMs: number + traceDurationMs: number + depth: number +}) { + const [expanded, setExpanded] = useState(depth < 2) + const children = childMap.get(span.name) || [] + const hasChildren = children.length > 0 + const durationMs = span.durationMs ?? (span.endMs ? span.endMs - span.startMs : 0) + const offsetPct = + traceDurationMs > 0 ? ((span.startMs - traceStartMs) / traceDurationMs) * 100 : 0 + const widthPct = traceDurationMs > 0 ? (durationMs / traceDurationMs) * 100 : 0 + + const statusColor = + span.status === 'ok' + ? 'bg-emerald-500/70' + : span.status === 'error' + ? 'bg-red-500/70' + : span.status === 'cancelled' + ? 'bg-yellow-500/70' + : 'bg-[var(--text-tertiary)]' + + const attrs = span.attributes || {} + const attrEntries = Object.entries(attrs).filter( + ([, v]) => v !== null && v !== undefined && v !== '' + ) + + return ( +
+ + + {expanded && attrEntries.length > 0 && ( +
+ {attrEntries.map(([key, val]) => ( +
+ {key}: + + {typeof val === 'object' ? JSON.stringify(val) : String(val)} + +
+ ))} +
+ )} + + {expanded && + children + .sort((a, b) => a.startMs - b.startMs) + .map((child) => ( + + ))} +
+ ) +} + +/* ─── Shared components ─── */ + +function StatCard({ + label, + value, + loading, +}: { + label: string + value?: string | number + loading?: boolean +}) { + return ( +
+

{label}

+ {loading ? ( + + ) : ( +

{value ?? '—'}

+ )} +
+ ) +} + +function MetaRow({ + label, + value, + mono, + children, +}: { + label: string + value?: string + mono?: boolean + children?: React.ReactNode +}) { + return ( +
+ {label} + {children || ( + + {value} + + )} +
+ ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/settings/navigation.ts b/apps/sim/app/workspace/[workspaceId]/settings/navigation.ts index f97e676866..f4760ac7eb 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/navigation.ts +++ b/apps/sim/app/workspace/[workspaceId]/settings/navigation.ts @@ -38,6 +38,7 @@ export type SettingsSection = | 'workflow-mcp-servers' | 'inbox' | 'admin' + | 'mothership' | 'recently-deleted' export type NavigationSection = @@ -169,4 +170,11 @@ export const allNavigationItems: NavigationItem[] = [ section: 'superuser', requiresAdminRole: true, }, + { + id: 'mothership', + label: 'Mothership', + icon: Server, + section: 'superuser', + requiresAdminRole: true, + }, ] diff --git a/apps/sim/hooks/queries/mothership-admin.ts b/apps/sim/hooks/queries/mothership-admin.ts new file mode 100644 index 0000000000..592daea4d8 --- /dev/null +++ b/apps/sim/hooks/queries/mothership-admin.ts @@ -0,0 +1,131 @@ +import { keepPreviousData, useMutation, useQuery } from '@tanstack/react-query' + +export type MothershipEnv = 'dev' | 'staging' | 'prod' + +const BASE = '/api/admin/mothership' + +async function mothershipPost( + endpoint: string, + environment: MothershipEnv, + body?: Record +) { + const res = await fetch(`${BASE}?env=${environment}&endpoint=${endpoint}`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + ...(body ? { body: JSON.stringify(body) } : {}), + }) + if (!res.ok) { + const err = await res.json().catch(() => ({ error: res.statusText })) + throw new Error(err.message || err.error || `Request failed (${res.status})`) + } + return res.json() +} + +async function mothershipGet( + endpoint: string, + environment: MothershipEnv, + params?: Record +) { + const qs = new URLSearchParams({ env: environment, endpoint, ...params }) + const res = await fetch(`${BASE}?${qs.toString()}`, { method: 'GET' }) + if (!res.ok) { + const err = await res.json().catch(() => ({ error: res.statusText })) + throw new Error(err.message || err.error || `Request failed (${res.status})`) + } + return res.json() +} + +export const mothershipKeys = { + all: ['mothership-admin'] as const, + requests: (env: MothershipEnv, start: string, end: string, userId?: string) => + [...mothershipKeys.all, 'requests', env, start, end, userId] as const, + userBreakdown: (env: MothershipEnv, start: string, end: string) => + [...mothershipKeys.all, 'user-breakdown', env, start, end] as const, + licenses: (env: MothershipEnv) => [...mothershipKeys.all, 'licenses', env] as const, + licenseDetails: (env: MothershipEnv, id?: string, name?: string) => + [...mothershipKeys.all, 'license-details', env, id, name] as const, + enterpriseStats: (env: MothershipEnv, customerType: string, start: string, end: string) => + [...mothershipKeys.all, 'enterprise-stats', env, customerType, start, end] as const, + trace: (env: MothershipEnv, requestId: string) => + [...mothershipKeys.all, 'trace', env, requestId] as const, +} + +export function useMothershipRequests( + environment: MothershipEnv, + start: string, + end: string, + userId?: string +) { + return useQuery({ + queryKey: mothershipKeys.requests(environment, start, end, userId), + queryFn: () => + mothershipPost('requests', environment, { + start, + end, + ...(userId ? { userId } : {}), + }), + enabled: !!start && !!end, + placeholderData: keepPreviousData, + }) +} + +export function useMothershipUserBreakdown(environment: MothershipEnv, start: string, end: string) { + return useQuery({ + queryKey: mothershipKeys.userBreakdown(environment, start, end), + queryFn: () => mothershipPost('user-breakdown', environment, { start, end }), + enabled: !!start && !!end, + placeholderData: keepPreviousData, + }) +} + +export function useMothershipLicenses(environment: MothershipEnv) { + return useQuery({ + queryKey: mothershipKeys.licenses(environment), + queryFn: () => mothershipGet('licenses', environment), + }) +} + +export function useMothershipLicenseDetails( + environment: MothershipEnv, + id?: string, + name?: string +) { + return useQuery({ + queryKey: mothershipKeys.licenseDetails(environment, id, name), + queryFn: () => + mothershipPost('licenses/details', environment, { + ...(id ? { id } : {}), + ...(name ? { name } : {}), + }), + enabled: !!(id || name), + }) +} + +export function useGenerateLicense(environment: MothershipEnv) { + return useMutation({ + mutationFn: (params: { name: string; expirationDate?: string }) => + mothershipPost('licenses/generate', environment, params), + }) +} + +export function useMothershipEnterpriseStats( + environment: MothershipEnv, + customerType: string, + start: string, + end: string +) { + return useQuery({ + queryKey: mothershipKeys.enterpriseStats(environment, customerType, start, end), + queryFn: () => mothershipPost('enterprise-stats', environment, { customerType, start, end }), + enabled: !!customerType && !!start && !!end, + placeholderData: keepPreviousData, + }) +} + +export function useMothershipTrace(environment: MothershipEnv, requestId: string) { + return useQuery({ + queryKey: mothershipKeys.trace(environment, requestId), + queryFn: () => mothershipGet('traces', environment, { requestId }), + enabled: !!requestId, + }) +} diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 35faa5200d..4b84635414 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -1,6 +1,7 @@ import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' import type { MothershipResource } from '@/app/workspace/[workspaceId]/home/types' export interface TaskMetadata { @@ -17,7 +18,7 @@ export interface TaskChatHistory { messages: PersistedMessage[] activeStreamId: string | null resources: MothershipResource[] - streamSnapshot?: { events: unknown[]; status: string } | null + streamSnapshot?: { events: StreamBatchEvent[]; status: string } | null } export const taskKeys = { diff --git a/apps/sim/lib/copilot/chat/terminal-state.test.ts b/apps/sim/lib/copilot/chat/terminal-state.test.ts new file mode 100644 index 0000000000..b529b3c0c4 --- /dev/null +++ b/apps/sim/lib/copilot/chat/terminal-state.test.ts @@ -0,0 +1,102 @@ +/** + * @vitest-environment node + */ + +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const { selectLimit, selectWhere, selectFrom, select, updateWhere, updateSet, update } = vi.hoisted( + () => { + const selectLimit = vi.fn() + const selectWhere = vi.fn(() => ({ limit: selectLimit })) + const selectFrom = vi.fn(() => ({ where: selectWhere })) + const select = vi.fn(() => ({ from: selectFrom })) + + const updateWhere = vi.fn() + const updateSet = vi.fn(() => ({ where: updateWhere })) + const update = vi.fn(() => ({ set: updateSet })) + + return { + selectLimit, + selectWhere, + selectFrom, + select, + updateWhere, + updateSet, + update, + } + } +) + +vi.mock('@sim/db', () => ({ + db: { + select, + update, + }, +})) + +import { finalizeAssistantTurn } from './terminal-state' + +describe('finalizeAssistantTurn', () => { + beforeEach(() => { + vi.clearAllMocks() + updateWhere.mockResolvedValue(undefined) + }) + + it('appends the assistant message when the user turn is still last', async () => { + selectLimit.mockResolvedValue([ + { + messages: [{ id: 'user-1', role: 'user', content: 'hello' }], + }, + ]) + + await finalizeAssistantTurn({ + chatId: 'chat-1', + userMessageId: 'user-1', + assistantMessage: { + id: 'assistant-1', + role: 'assistant', + content: 'hi', + timestamp: '2024-01-01T00:00:00.000Z', + }, + }) + + expect(updateSet).toHaveBeenCalledWith( + expect.objectContaining({ + updatedAt: expect.any(Date), + conversationId: expect.anything(), + messages: expect.anything(), + }) + ) + }) + + it('only clears the active stream marker when a response is already persisted', async () => { + selectLimit.mockResolvedValue([ + { + messages: [ + { id: 'user-1', role: 'user', content: 'hello' }, + { id: 'assistant-1', role: 'assistant', content: 'partial' }, + ], + }, + ]) + + await finalizeAssistantTurn({ + chatId: 'chat-1', + userMessageId: 'user-1', + assistantMessage: { + id: 'assistant-2', + role: 'assistant', + content: 'final', + timestamp: '2024-01-01T00:00:00.000Z', + }, + }) + + const updateArg = updateSet.mock.calls[0]?.[0] as Record + expect(updateArg).toEqual( + expect.objectContaining({ + updatedAt: expect.any(Date), + conversationId: expect.anything(), + }) + ) + expect(Object.hasOwn(updateArg, 'messages')).toBe(false) + }) +}) diff --git a/apps/sim/lib/copilot/request/lifecycle/start.test.ts b/apps/sim/lib/copilot/request/lifecycle/start.test.ts index de610ca6ab..d8e992381e 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.test.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.test.ts @@ -10,6 +10,7 @@ const { createRunSegment, updateRunStatus, resetBuffer, + scheduleBufferCleanup, allocateCursor, appendEvent, cleanupAbortMarker, @@ -20,6 +21,7 @@ const { createRunSegment: vi.fn(), updateRunStatus: vi.fn(), resetBuffer: vi.fn(), + scheduleBufferCleanup: vi.fn(), allocateCursor: vi.fn(), appendEvent: vi.fn(), cleanupAbortMarker: vi.fn(), @@ -27,7 +29,7 @@ const { releasePendingChatStream: vi.fn(), })) -vi.mock('@/lib/copilot/request/lifecycle/continue', () => ({ +vi.mock('@/lib/copilot/request/lifecycle/run', () => ({ runCopilotLifecycle, })) @@ -40,6 +42,7 @@ let mockPublisherController: ReadableStreamDefaultController | null = null vi.mock('@/lib/copilot/request/session', () => ({ resetBuffer, + scheduleBufferCleanup, allocateCursor, appendEvent, cleanupAbortMarker, @@ -107,6 +110,7 @@ describe('createSSEStream terminal error handling', () => { beforeEach(() => { vi.clearAllMocks() resetBuffer.mockResolvedValue(undefined) + scheduleBufferCleanup.mockResolvedValue(undefined) allocateCursor .mockResolvedValueOnce({ seq: 1, cursor: '1' }) .mockResolvedValueOnce({ seq: 2, cursor: '2' }) @@ -149,6 +153,7 @@ describe('createSSEStream terminal error handling', () => { type: MothershipStreamV1EventType.error, }) ) + expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1') }) it('writes the thrown terminal error event before close for replay durability', async () => { @@ -175,5 +180,6 @@ describe('createSSEStream terminal error handling', () => { type: MothershipStreamV1EventType.error, }) ) + expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1') }) }) diff --git a/apps/sim/lib/copilot/request/lifecycle/start.ts b/apps/sim/lib/copilot/request/lifecycle/start.ts index ed006a9898..34454db99c 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.ts @@ -18,6 +18,7 @@ import { releasePendingChatStream, resetBuffer, StreamWriter, + scheduleBufferCleanup, startAbortPoller, unregisterActiveStream, } from '@/lib/copilot/request/session' @@ -205,6 +206,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS if (chatId) { await releasePendingChatStream(chatId, streamId) } + await scheduleBufferCleanup(streamId) await cleanupAbortMarker(streamId) const trace = collector.build({ diff --git a/apps/sim/lib/copilot/request/session/buffer.test.ts b/apps/sim/lib/copilot/request/session/buffer.test.ts index 9ce631bd16..05d70ee24e 100644 --- a/apps/sim/lib/copilot/request/session/buffer.test.ts +++ b/apps/sim/lib/copilot/request/session/buffer.test.ts @@ -106,7 +106,13 @@ vi.mock('@/lib/core/config/redis', () => ({ getRedisClient: () => mockRedis, })) -import { allocateCursor, appendEvent, readEvents } from '@/lib/copilot/request/session/buffer' +import { + allocateCursor, + appendEvent, + clearBuffer, + readEvents, + scheduleBufferCleanup, +} from '@/lib/copilot/request/session/buffer' describe('mothership-stream-outbox', () => { beforeEach(() => { @@ -114,7 +120,7 @@ describe('mothership-stream-outbox', () => { vi.clearAllMocks() }) - it.concurrent('replays envelopes after a given cursor', async () => { + it('replays envelopes after a given cursor', async () => { const firstCursor = await allocateCursor('stream-1') const secondCursor = await allocateCursor('stream-1') @@ -145,4 +151,66 @@ describe('mothership-stream-outbox', () => { const replayed = await readEvents('stream-1', '1') expect(replayed.map((entry) => entry.payload.text)).toEqual(['world']) }) + + it('does not trim active stream history while appending events', async () => { + const cursor = await allocateCursor('stream-1') + + await appendEvent( + createEvent({ + streamId: 'stream-1', + cursor: cursor.cursor, + seq: cursor.seq, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { channel: MothershipStreamV1TextChannel.assistant, text: 'hello' }, + }) + ) + + expect(mockRedis.zremrangebyrank).not.toHaveBeenCalled() + }) + + it('clears persisted stream state during teardown cleanup', async () => { + const cursor = await allocateCursor('stream-1') + + await appendEvent( + createEvent({ + streamId: 'stream-1', + cursor: cursor.cursor, + seq: cursor.seq, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { channel: MothershipStreamV1TextChannel.assistant, text: 'hello' }, + }) + ) + + expect((await readEvents('stream-1', '0')).length).toBe(1) + + await clearBuffer('stream-1') + + expect(await readEvents('stream-1', '0')).toEqual([]) + }) + + it('shortens completed stream retention without deleting replay data immediately', async () => { + const cursor = await allocateCursor('stream-1') + + await appendEvent( + createEvent({ + streamId: 'stream-1', + cursor: cursor.cursor, + seq: cursor.seq, + requestId: 'req-1', + type: MothershipStreamV1EventType.text, + payload: { channel: MothershipStreamV1TextChannel.assistant, text: 'hello' }, + }) + ) + + await scheduleBufferCleanup('stream-1', 30) + + expect(mockRedis.expire).toHaveBeenCalledWith('mothership_stream:stream-1:events', 30) + expect(mockRedis.expire).toHaveBeenCalledWith('mothership_stream:stream-1:seq', 30) + expect(mockRedis.expire).toHaveBeenCalledWith('mothership_stream:stream-1:abort', 30) + expect((await readEvents('stream-1', '0')).map((entry) => entry.payload.text)).toEqual([ + 'hello', + ]) + }) }) diff --git a/apps/sim/lib/copilot/request/session/buffer.ts b/apps/sim/lib/copilot/request/session/buffer.ts index 89e661366c..4e5d3cd7a5 100644 --- a/apps/sim/lib/copilot/request/session/buffer.ts +++ b/apps/sim/lib/copilot/request/session/buffer.ts @@ -7,6 +7,7 @@ const logger = createLogger('SessionBuffer') const STREAM_OUTBOX_PREFIX = 'mothership_stream:' const DEFAULT_TTL_SECONDS = 60 * 60 +const DEFAULT_COMPLETED_TTL_SECONDS = 5 * 60 const DEFAULT_EVENT_LIMIT = 5_000 const RETRY_DELAYS_MS = [0, 50, 150] as const @@ -97,11 +98,36 @@ export async function allocateCursor(streamId: string): Promise<{ } export async function resetBuffer(streamId: string): Promise { - await withRedisRetry({ operation: 'reset_outbox', streamId }, async (redis) => { + await clearBuffer(streamId, 'reset_outbox') +} + +export async function clearBuffer(streamId: string, operation = 'clear_outbox'): Promise { + await withRedisRetry({ operation, streamId }, async (redis) => { await redis.del(getEventsKey(streamId), getSeqKey(streamId), getAbortKey(streamId)) }) } +export async function scheduleBufferCleanup( + streamId: string, + ttlSeconds = DEFAULT_COMPLETED_TTL_SECONDS +): Promise { + try { + await withRedisRetry({ operation: 'schedule_outbox_cleanup', streamId }, async (redis) => { + const pipeline = redis.pipeline() + pipeline.expire(getEventsKey(streamId), ttlSeconds) + pipeline.expire(getSeqKey(streamId), ttlSeconds) + pipeline.expire(getAbortKey(streamId), ttlSeconds) + await pipeline.exec() + }) + } catch (error) { + logger.warn('Failed to shorten stream buffer TTL during cleanup', { + streamId, + ttlSeconds, + error: error instanceof Error ? error.message : String(error), + }) + } +} + export async function appendEvents( envelopes: MothershipStreamV1EventEnvelope[] ): Promise { @@ -123,9 +149,6 @@ export async function appendEvents( pipeline.zadd(key, ...(zaddArgs as [number, string, ...Array])) pipeline.expire(key, config.ttlSeconds) pipeline.set(seqKey, String(envelopes[envelopes.length - 1].seq), 'EX', config.ttlSeconds) - if (config.eventLimit > 0) { - pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1) - } await pipeline.exec() }) diff --git a/apps/sim/lib/copilot/request/session/index.ts b/apps/sim/lib/copilot/request/session/index.ts index 6a6eddd1ee..61502be65b 100644 --- a/apps/sim/lib/copilot/request/session/index.ts +++ b/apps/sim/lib/copilot/request/session/index.ts @@ -14,12 +14,14 @@ export { appendEvent, appendEvents, clearAbortMarker, + clearBuffer, getLatestSeq, getOldestSeq, hasAbortMarker, InvalidCursorError, readEvents, resetBuffer, + scheduleBufferCleanup, writeAbortMarker, } from './buffer' export { createEvent, eventToStreamEvent, isEventRecord, TOOL_CALL_STATUS } from './event' diff --git a/apps/sim/lib/copilot/request/session/types.ts b/apps/sim/lib/copilot/request/session/types.ts index 1cd8b7af5f..f53c334b64 100644 --- a/apps/sim/lib/copilot/request/session/types.ts +++ b/apps/sim/lib/copilot/request/session/types.ts @@ -1,4 +1,5 @@ import type { + MothershipStreamV1EventEnvelope, MothershipStreamV1EventType, MothershipStreamV1StreamScope, } from '@/lib/copilot/generated/mothership-stream-v1' @@ -8,3 +9,17 @@ export interface StreamEvent { payload: Record scope?: MothershipStreamV1StreamScope } + +export interface StreamBatchEvent { + eventId: number + streamId: string + event: MothershipStreamV1EventEnvelope +} + +export function toStreamBatchEvent(envelope: MothershipStreamV1EventEnvelope): StreamBatchEvent { + return { + eventId: envelope.seq, + streamId: envelope.stream.streamId, + event: envelope, + } +} diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 3d180e5e62..bef5712fab 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -144,6 +144,12 @@ export const env = createEnv({ // Admin API ADMIN_API_KEY: z.string().min(32).optional(), // Admin API key for self-hosted GitOps access (generate with: openssl rand -hex 32) + // Mothership Admin + MOTHERSHIP_API_ADMIN_KEY: z.string().min(1).optional(), // Admin API key for mothership/copilot admin endpoints + MOTHERSHIP_DEV_URL: z.string().url().optional(), // Mothership dev environment URL + MOTHERSHIP_STAGING_URL: z.string().url().optional(), // Mothership staging environment URL + MOTHERSHIP_PROD_URL: z.string().url().optional(), // Mothership production environment URL + // Infrastructure & Deployment NEXT_RUNTIME: z.string().optional(), // Next.js runtime environment DOCKER_BUILD: z.boolean().optional(), // Flag indicating Docker build environment