v0.5.38: snap to grid, copilot ux improvements, billing line items

This commit is contained in:
Waleed
2025-12-20 17:24:38 -08:00
committed by GitHub
44 changed files with 26265 additions and 152 deletions

View File

@@ -3,6 +3,7 @@ import { userStats } from '@sim/db/schema'
import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { logModelUsage } from '@/lib/billing/core/usage-log'
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { checkInternalApiKey } from '@/lib/copilot/utils'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
@@ -14,6 +15,9 @@ const logger = createLogger('BillingUpdateCostAPI')
const UpdateCostSchema = z.object({
userId: z.string().min(1, 'User ID is required'),
cost: z.number().min(0, 'Cost must be a non-negative number'),
model: z.string().min(1, 'Model is required'),
inputTokens: z.number().min(0).default(0),
outputTokens: z.number().min(0).default(0),
})
/**
@@ -71,11 +75,12 @@ export async function POST(req: NextRequest) {
)
}
const { userId, cost } = validation.data
const { userId, cost, model, inputTokens, outputTokens } = validation.data
logger.info(`[${requestId}] Processing cost update`, {
userId,
cost,
model,
})
// Check if user stats record exists (same as ExecutionLogger)
@@ -107,6 +112,16 @@ export async function POST(req: NextRequest) {
addedCost: cost,
})
// Log usage for complete audit trail
await logModelUsage({
userId,
source: 'copilot',
model,
inputTokens,
outputTokens,
cost,
})
// Check if user has hit overage threshold and bill incrementally
await checkAndBillOverageThreshold(userId)

View File

@@ -1,6 +1,6 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { chat } from '@sim/db/schema'
import { chat, workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
@@ -94,6 +94,21 @@ export async function POST(
if (!deployment.isActive) {
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)
const [workflowRecord] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, deployment.workflowId))
.limit(1)
const workspaceId = workflowRecord?.workspaceId
if (!workspaceId) {
logger.warn(`[${requestId}] Cannot log: workflow ${deployment.workflowId} has no workspace`)
return addCorsHeaders(
createErrorResponse('This chat is currently unavailable', 403),
request
)
}
const executionId = randomUUID()
const loggingSession = new LoggingSession(
deployment.workflowId,
@@ -104,7 +119,7 @@ export async function POST(
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: '', // Will be resolved if needed
workspaceId,
variables: {},
})
@@ -169,7 +184,14 @@ export async function POST(
const { actorUserId, workflowRecord } = preprocessResult
const workspaceOwnerId = actorUserId!
const workspaceId = workflowRecord?.workspaceId || ''
const workspaceId = workflowRecord?.workspaceId
if (!workspaceId) {
logger.error(`[${requestId}] Workflow ${deployment.workflowId} has no workspaceId`)
return addCorsHeaders(
createErrorResponse('Workflow has no associated workspace', 500),
request
)
}
try {
const selectedOutputs: string[] = []

View File

@@ -57,7 +57,7 @@ export async function GET(request: NextRequest) {
workflowName: workflow.name,
}
let conditions: SQL | undefined = eq(workflow.workspaceId, params.workspaceId)
let conditions: SQL | undefined = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
if (params.level && params.level !== 'all') {
const levels = params.level.split(',').filter(Boolean)
@@ -134,7 +134,7 @@ export async function GET(request: NextRequest) {
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)

View File

@@ -130,6 +130,8 @@ export async function GET(request: NextRequest) {
deploymentVersionName: sql<null>`NULL`,
}
const workspaceFilter = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
const baseQuery = db
.select(selectColumns)
.from(workflowExecutionLogs)
@@ -141,18 +143,12 @@ export async function GET(request: NextRequest) {
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
@@ -300,7 +296,7 @@ export async function GET(request: NextRequest) {
}
const logs = await baseQuery
.where(conditions)
.where(and(workspaceFilter, conditions))
.orderBy(desc(workflowExecutionLogs.startedAt))
.limit(params.limit)
.offset(params.offset)
@@ -312,22 +308,16 @@ export async function GET(request: NextRequest) {
pausedExecutions,
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
.where(conditions)
.where(and(eq(workflowExecutionLogs.workspaceId, params.workspaceId), conditions))
const countResult = await countQuery

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { permissions, workflowExecutionLogs } from '@sim/db/schema'
import { and, eq, isNotNull, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
@@ -42,23 +42,17 @@ export async function GET(request: NextRequest) {
trigger: workflowExecutionLogs.trigger,
})
.from(workflowExecutionLogs)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
.where(
and(
eq(workflowExecutionLogs.workspaceId, params.workspaceId),
isNotNull(workflowExecutionLogs.trigger),
sql`${workflowExecutionLogs.trigger} NOT IN ('api', 'manual', 'webhook', 'chat', 'schedule')`
)

View File

@@ -26,9 +26,9 @@ const SettingsSchema = z.object({
showTrainingControls: z.boolean().optional(),
superUserModeEnabled: z.boolean().optional(),
errorNotificationsEnabled: z.boolean().optional(),
snapToGridSize: z.number().min(0).max(50).optional(),
})
// Default settings values
const defaultSettings = {
theme: 'system',
autoConnect: true,
@@ -38,6 +38,7 @@ const defaultSettings = {
showTrainingControls: false,
superUserModeEnabled: false,
errorNotificationsEnabled: true,
snapToGridSize: 0,
}
export async function GET() {
@@ -46,7 +47,6 @@ export async function GET() {
try {
const session = await getSession()
// Return default settings for unauthenticated users instead of 401 error
if (!session?.user?.id) {
logger.info(`[${requestId}] Returning default settings for unauthenticated user`)
return NextResponse.json({ data: defaultSettings }, { status: 200 })
@@ -72,13 +72,13 @@ export async function GET() {
showTrainingControls: userSettings.showTrainingControls ?? false,
superUserModeEnabled: userSettings.superUserModeEnabled ?? true,
errorNotificationsEnabled: userSettings.errorNotificationsEnabled ?? true,
snapToGridSize: userSettings.snapToGridSize ?? 0,
},
},
{ status: 200 }
)
} catch (error: any) {
logger.error(`[${requestId}] Settings fetch error`, error)
// Return default settings on error instead of error response
return NextResponse.json({ data: defaultSettings }, { status: 200 })
}
}
@@ -89,7 +89,6 @@ export async function PATCH(request: Request) {
try {
const session = await getSession()
// Return success for unauthenticated users instead of error
if (!session?.user?.id) {
logger.info(
`[${requestId}] Settings update attempted by unauthenticated user - acknowledged without saving`
@@ -103,7 +102,6 @@ export async function PATCH(request: Request) {
try {
const validatedData = SettingsSchema.parse(body)
// Store the settings
await db
.insert(settings)
.values({
@@ -135,7 +133,6 @@ export async function PATCH(request: Request) {
}
} catch (error: any) {
logger.error(`[${requestId}] Settings update error`, error)
// Return success on error instead of error response
return NextResponse.json({ success: true }, { status: 200 })
}
}

View File

@@ -0,0 +1,105 @@
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getUserUsageLogs, type UsageLogSource } from '@/lib/billing/core/usage-log'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('UsageLogsAPI')
const QuerySchema = z.object({
source: z.enum(['workflow', 'wand', 'copilot']).optional(),
workspaceId: z.string().optional(),
period: z.enum(['1d', '7d', '30d', 'all']).optional().default('30d'),
limit: z.coerce.number().min(1).max(100).optional().default(50),
cursor: z.string().optional(),
})
/**
* GET /api/users/me/usage-logs
* Get usage logs for the authenticated user
*/
export async function GET(req: NextRequest) {
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
const { searchParams } = new URL(req.url)
const queryParams = {
source: searchParams.get('source') || undefined,
workspaceId: searchParams.get('workspaceId') || undefined,
period: searchParams.get('period') || '30d',
limit: searchParams.get('limit') || '50',
cursor: searchParams.get('cursor') || undefined,
}
const validation = QuerySchema.safeParse(queryParams)
if (!validation.success) {
return NextResponse.json(
{
error: 'Invalid query parameters',
details: validation.error.issues,
},
{ status: 400 }
)
}
const { source, workspaceId, period, limit, cursor } = validation.data
let startDate: Date | undefined
const endDate = new Date()
if (period !== 'all') {
startDate = new Date()
switch (period) {
case '1d':
startDate.setDate(startDate.getDate() - 1)
break
case '7d':
startDate.setDate(startDate.getDate() - 7)
break
case '30d':
startDate.setDate(startDate.getDate() - 30)
break
}
}
const result = await getUserUsageLogs(userId, {
source: source as UsageLogSource | undefined,
workspaceId,
startDate,
endDate,
limit,
cursor,
})
logger.debug('Retrieved usage logs', {
userId,
source,
period,
logCount: result.logs.length,
hasMore: result.pagination.hasMore,
})
return NextResponse.json({
success: true,
...result,
})
} catch (error) {
logger.error('Failed to get usage logs', {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json(
{
error: 'Failed to retrieve usage logs',
},
{ status: 500 }
)
}
}

View File

@@ -25,8 +25,7 @@ export interface LogFilters {
export function buildLogFilters(filters: LogFilters): SQL<unknown> {
const conditions: SQL<unknown>[] = []
// Required: workspace and permissions check
conditions.push(eq(workflow.workspaceId, filters.workspaceId))
conditions.push(eq(workflowExecutionLogs.workspaceId, filters.workspaceId))
// Cursor-based pagination
if (filters.cursor) {

View File

@@ -105,7 +105,6 @@ export async function GET(request: NextRequest) {
const conditions = buildLogFilters(filters)
const orderBy = getOrderBy(params.order)
// Build and execute query
const baseQuery = db
.select({
id: workflowExecutionLogs.id,
@@ -124,13 +123,7 @@ export async function GET(request: NextRequest) {
workflowDescription: workflow.description,
})
.from(workflowExecutionLogs)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -197,11 +190,8 @@ export async function GET(request: NextRequest) {
return result
})
// Get user's workflow execution limits and usage
const limits = await getUserLimits(userId)
// Create response with limits information
// The rateLimit object from checkRateLimit is for THIS API endpoint's rate limits
const response = createApiResponse(
{
data: formattedLogs,

View File

@@ -4,6 +4,7 @@ import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import OpenAI, { AzureOpenAI } from 'openai'
import { getSession } from '@/lib/auth'
import { logModelUsage } from '@/lib/billing/core/usage-log'
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
@@ -88,7 +89,7 @@ async function updateUserStatsForWand(
try {
const [workflowRecord] = await db
.select({ userId: workflow.userId })
.select({ userId: workflow.userId, workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
@@ -101,6 +102,7 @@ async function updateUserStatsForWand(
}
const userId = workflowRecord.userId
const workspaceId = workflowRecord.workspaceId
const totalTokens = usage.total_tokens || 0
const promptTokens = usage.prompt_tokens || 0
const completionTokens = usage.completion_tokens || 0
@@ -137,6 +139,17 @@ async function updateUserStatsForWand(
costAdded: costToStore,
})
await logModelUsage({
userId,
source: 'wand',
model: modelName,
inputTokens: promptTokens,
outputTokens: completionTokens,
cost: costToStore,
workspaceId: workspaceId ?? undefined,
workflowId,
})
await checkAndBillOverageThreshold(userId)
} catch (error) {
logger.error(`[${requestId}] Failed to update user stats for wand usage`, error)

View File

@@ -409,10 +409,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const actorUserId = preprocessResult.actorUserId!
const workflow = preprocessResult.workflowRecord!
if (!workflow.workspaceId) {
logger.error(`[${requestId}] Workflow ${workflowId} has no workspaceId`)
return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 })
}
const workspaceId = workflow.workspaceId
logger.info(`[${requestId}] Preprocessing passed`, {
workflowId,
actorUserId,
workspaceId: workflow.workspaceId,
workspaceId,
})
if (isAsyncMode) {
@@ -460,7 +466,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
)
const executionContext = {
workspaceId: workflow.workspaceId || '',
workspaceId,
workflowId,
executionId,
}
@@ -478,7 +484,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflow.workspaceId || '',
workspaceId,
variables: {},
})
@@ -507,7 +513,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId ?? undefined,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,
@@ -589,7 +595,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflow: {
id: workflow.id,
userId: actorUserId,
workspaceId: workflow.workspaceId,
workspaceId,
isDeployed: workflow.isDeployed,
variables: (workflow as any).variables,
},
@@ -775,7 +781,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId ?? undefined,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,

View File

@@ -70,7 +70,11 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const loggingSession = new LoggingSession(id, executionId, triggerType, requestId)
const userId = accessValidation.workflow.userId
const workspaceId = accessValidation.workflow.workspaceId || ''
const workspaceId = accessValidation.workflow.workspaceId
if (!workspaceId) {
logger.error(`[${requestId}] Workflow ${id} has no workspaceId`)
return createErrorResponse('Workflow has no associated workspace', 500)
}
await loggingSession.safeStart({
userId,

View File

@@ -2,3 +2,4 @@ export * from './file-display'
export { default as CopilotMarkdownRenderer } from './markdown-renderer'
export * from './smooth-streaming'
export * from './thinking-block'
export * from './usage-limit-actions'

View File

@@ -0,0 +1,99 @@
'use client'
import { useState } from 'react'
import { Loader2 } from 'lucide-react'
import { Button } from '@/components/emcn'
import { canEditUsageLimit } from '@/lib/billing/subscriptions/utils'
import { isHosted } from '@/lib/core/config/feature-flags'
import { useSubscriptionData, useUpdateUsageLimit } from '@/hooks/queries/subscription'
import { useCopilotStore } from '@/stores/panel/copilot/store'
const LIMIT_INCREMENTS = [0, 50, 100] as const
function roundUpToNearest50(value: number): number {
return Math.ceil(value / 50) * 50
}
export function UsageLimitActions() {
const { data: subscriptionData } = useSubscriptionData()
const updateUsageLimitMutation = useUpdateUsageLimit()
const subscription = subscriptionData?.data
const canEdit = subscription ? canEditUsageLimit(subscription) : false
const [selectedAmount, setSelectedAmount] = useState<number | null>(null)
const [isHidden, setIsHidden] = useState(false)
const currentLimit = subscription?.usage_limit ?? 0
const baseLimit = roundUpToNearest50(currentLimit) || 50
const limitOptions = LIMIT_INCREMENTS.map((increment) => baseLimit + increment)
const handleUpdateLimit = async (newLimit: number) => {
setSelectedAmount(newLimit)
try {
await updateUsageLimitMutation.mutateAsync({ limit: newLimit })
setIsHidden(true)
const { messages, sendMessage } = useCopilotStore.getState()
const lastUserMessage = [...messages].reverse().find((m) => m.role === 'user')
if (lastUserMessage) {
const filteredMessages = messages.filter(
(m) => !(m.role === 'assistant' && m.errorType === 'usage_limit')
)
useCopilotStore.setState({ messages: filteredMessages })
await sendMessage(lastUserMessage.content, {
fileAttachments: lastUserMessage.fileAttachments,
contexts: lastUserMessage.contexts,
messageId: lastUserMessage.id,
})
}
} catch {
setIsHidden(false)
} finally {
setSelectedAmount(null)
}
}
const handleNavigateToUpgrade = () => {
if (isHosted) {
window.dispatchEvent(new CustomEvent('open-settings', { detail: { tab: 'subscription' } }))
} else {
window.open('https://www.sim.ai', '_blank')
}
}
if (isHidden) {
return null
}
if (!isHosted || !canEdit) {
return (
<Button onClick={handleNavigateToUpgrade} variant='default'>
Upgrade
</Button>
)
}
return (
<>
{limitOptions.map((limit) => {
const isLoading = updateUsageLimitMutation.isPending && selectedAmount === limit
const isDisabled = updateUsageLimitMutation.isPending
return (
<Button
key={limit}
onClick={() => handleUpdateLimit(limit)}
disabled={isDisabled}
variant='default'
>
{isLoading ? <Loader2 className='mr-1 h-3 w-3 animate-spin' /> : null}${limit}
</Button>
)
})}
</>
)
}

View File

@@ -9,6 +9,7 @@ import {
SmoothStreamingText,
StreamingIndicator,
ThinkingBlock,
UsageLimitActions,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/components'
import CopilotMarkdownRenderer from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/components/markdown-renderer'
import {
@@ -458,6 +459,12 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
<StreamingIndicator />
)}
{message.errorType === 'usage_limit' && (
<div className='mt-3 flex gap-1.5'>
<UsageLimitActions />
</div>
)}
{/* Action buttons for completed messages */}
{!isStreaming && cleanTextContent && (
<div className='flex items-center gap-[8px] pt-[8px]'>

View File

@@ -214,6 +214,7 @@ export function Code({
const handleStreamStartRef = useRef<() => void>(() => {})
const handleGeneratedContentRef = useRef<(generatedCode: string) => void>(() => {})
const handleStreamChunkRef = useRef<(chunk: string) => void>(() => {})
const hasEditedSinceFocusRef = useRef(false)
// Custom hooks
const accessiblePrefixes = useAccessibleReferencePrefixes(blockId)
@@ -504,6 +505,7 @@ export function Code({
setCode(newValue)
setStoreValue(newValue)
hasEditedSinceFocusRef.current = true
const newCursorPosition = dropPosition + 1
setCursorPosition(newCursorPosition)
@@ -533,6 +535,7 @@ export function Code({
if (!isPreview && !readOnly) {
setCode(newValue)
emitTagSelection(newValue)
hasEditedSinceFocusRef.current = true
}
setShowTags(false)
setActiveSourceBlockId(null)
@@ -550,6 +553,7 @@ export function Code({
if (!isPreview && !readOnly) {
setCode(newValue)
emitTagSelection(newValue)
hasEditedSinceFocusRef.current = true
}
setShowEnvVars(false)
@@ -741,6 +745,7 @@ export function Code({
value={code}
onValueChange={(newCode) => {
if (!isAiStreaming && !isPreview && !disabled && !readOnly) {
hasEditedSinceFocusRef.current = true
setCode(newCode)
setStoreValue(newCode)
@@ -769,6 +774,12 @@ export function Code({
if (isAiStreaming) {
e.preventDefault()
}
if (e.key === 'z' && (e.metaKey || e.ctrlKey) && !hasEditedSinceFocusRef.current) {
e.preventDefault()
}
}}
onFocus={() => {
hasEditedSinceFocusRef.current = false
}}
highlight={createHighlightFunction(effectiveLanguage, shouldHighlightReference)}
{...getCodeEditorProps({ isStreaming: isAiStreaming, isPreview, disabled })}

View File

@@ -92,7 +92,6 @@ const edgeTypes: EdgeTypes = {
/** ReactFlow configuration constants. */
const defaultEdgeOptions = { type: 'custom' }
const snapGrid: [number, number] = [20, 20]
const reactFlowFitViewOptions = { padding: 0.6 } as const
const reactFlowProOptions = { hideAttribution: true } as const
@@ -160,6 +159,14 @@ const WorkflowContent = React.memo(() => {
// Training modal state
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
// Snap to grid settings
const snapToGridSize = useGeneralStore((state) => state.snapToGridSize)
const snapToGrid = snapToGridSize > 0
const snapGrid: [number, number] = useMemo(
() => [snapToGridSize, snapToGridSize],
[snapToGridSize]
)
// Handle copilot stream cleanup on page unload and component unmount
useStreamCleanup(copilotCleanup)
@@ -2311,7 +2318,7 @@ const WorkflowContent = React.memo(() => {
onNodeDrag={effectivePermissions.canEdit ? onNodeDrag : undefined}
onNodeDragStop={effectivePermissions.canEdit ? onNodeDragStop : undefined}
onNodeDragStart={effectivePermissions.canEdit ? onNodeDragStart : undefined}
snapToGrid={false}
snapToGrid={snapToGrid}
snapGrid={snapGrid}
elevateEdgesOnSelect={true}
onlyRenderVisibleElements={false}

View File

@@ -12,6 +12,7 @@ import {
ModalContent,
ModalFooter,
ModalHeader,
Slider,
Switch,
} from '@/components/emcn'
import { Input, Skeleton } from '@/components/ui'
@@ -76,6 +77,9 @@ export function General({ onOpenChange }: GeneralProps) {
const [uploadError, setUploadError] = useState<string | null>(null)
const [localSnapValue, setLocalSnapValue] = useState<number | null>(null)
const snapToGridValue = localSnapValue ?? settings?.snapToGridSize ?? 0
useEffect(() => {
if (profile?.name) {
setName(profile.name)
@@ -234,6 +238,18 @@ export function General({ onOpenChange }: GeneralProps) {
}
}
const handleSnapToGridChange = (value: number[]) => {
setLocalSnapValue(value[0])
}
const handleSnapToGridCommit = async (value: number[]) => {
const newValue = value[0]
if (newValue !== settings?.snapToGridSize && !updateSetting.isPending) {
await updateSetting.mutateAsync({ key: 'snapToGridSize', value: newValue })
}
setLocalSnapValue(null)
}
const handleTrainingControlsChange = async (checked: boolean) => {
if (checked !== settings?.showTrainingControls && !updateSetting.isPending) {
await updateSetting.mutateAsync({ key: 'showTrainingControls', value: checked })
@@ -393,7 +409,6 @@ export function General({ onOpenChange }: GeneralProps) {
dropdownWidth={140}
value={settings?.theme}
onChange={handleThemeChange}
disabled={updateSetting.isPending}
placeholder='Select theme'
options={[
{ label: 'System', value: 'system' },
@@ -410,17 +425,34 @@ export function General({ onOpenChange }: GeneralProps) {
id='auto-connect'
checked={settings?.autoConnect ?? true}
onCheckedChange={handleAutoConnectChange}
disabled={updateSetting.isPending}
/>
</div>
<div className='flex items-center justify-between'>
<Label htmlFor='snap-to-grid'>Snap to grid</Label>
<div className='flex items-center gap-[12px]'>
<span className='w-[32px] text-right text-[12px] text-[var(--text-tertiary)]'>
{snapToGridValue === 0 ? 'Off' : `${snapToGridValue}px`}
</span>
<Slider
id='snap-to-grid'
value={[snapToGridValue]}
onValueChange={handleSnapToGridChange}
onValueCommit={handleSnapToGridCommit}
min={0}
max={50}
step={1}
className='w-[100px]'
/>
</div>
</div>
<div className='flex items-center justify-between'>
<Label htmlFor='error-notifications'>Run error notifications</Label>
<Switch
id='error-notifications'
checked={settings?.errorNotificationsEnabled ?? true}
onCheckedChange={handleErrorNotificationsChange}
disabled={updateSetting.isPending}
/>
</div>
@@ -430,7 +462,6 @@ export function General({ onOpenChange }: GeneralProps) {
id='telemetry'
checked={settings?.telemetryEnabled ?? true}
onCheckedChange={handleTelemetryToggle}
disabled={updateSetting.isPending}
/>
</div>
@@ -446,7 +477,6 @@ export function General({ onOpenChange }: GeneralProps) {
id='training-controls'
checked={settings?.showTrainingControls ?? false}
onCheckedChange={handleTrainingControlsChange}
disabled={updateSetting.isPending}
/>
</div>
)}
@@ -458,7 +488,6 @@ export function General({ onOpenChange }: GeneralProps) {
id='super-user-mode'
checked={settings?.superUserModeEnabled ?? true}
onCheckedChange={handleSuperUserModeToggle}
disabled={updateSetting.isPending}
/>
</div>
)}
@@ -534,6 +563,15 @@ function GeneralSkeleton() {
<Skeleton className='h-[17px] w-[30px] rounded-full' />
</div>
{/* Snap to grid row */}
<div className='flex items-center justify-between'>
<Skeleton className='h-4 w-24' />
<div className='flex items-center gap-[12px]'>
<Skeleton className='h-3 w-[32px]' />
<Skeleton className='h-[6px] w-[100px] rounded-[20px]' />
</div>
</div>
{/* Error notifications row */}
<div className='flex items-center justify-between'>
<Skeleton className='h-4 w-40' />

View File

@@ -209,11 +209,16 @@ async function runWorkflowExecution({
const mergedStates = mergeSubblockState(blocks)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const personalEnvUserId = workflowRecord.userId
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
personalEnvUserId,
workflowRecord.workspaceId || undefined
workspaceId
)
const variables = EnvVarsSchema.parse({
@@ -232,7 +237,7 @@ async function runWorkflowExecution({
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
workspaceId,
variables: variables || {},
deploymentVersionId,
})
@@ -241,7 +246,7 @@ async function runWorkflowExecution({
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId: workflowRecord.workspaceId || '',
workspaceId,
userId: actorUserId,
sessionUserId: undefined,
workflowUserId: workflowRecord.userId,

View File

@@ -164,7 +164,10 @@ async function executeWebhookJobInternal(
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const workspaceId = wfRows[0]?.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
// Merge subblock states (matching workflow-execution pattern)
@@ -298,7 +301,7 @@ async function executeWebhookJobInternal(
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
workspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,
@@ -356,7 +359,7 @@ async function executeWebhookJobInternal(
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
workspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,
@@ -398,7 +401,7 @@ async function executeWebhookJobInternal(
if (triggerConfig.outputs) {
logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`)
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
workspaceId: workspaceId || '',
workspaceId,
workflowId: payload.workflowId,
executionId,
requestId,
@@ -431,7 +434,7 @@ async function executeWebhookJobInternal(
if (fileFields.length > 0 && typeof input === 'object' && input !== null) {
const executionContext = {
workspaceId: workspaceId || '',
workspaceId,
workflowId: payload.workflowId,
executionId,
}
@@ -542,9 +545,23 @@ async function executeWebhookJobInternal(
})
try {
const wfRow = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const errorWorkspaceId = wfRow[0]?.workspaceId
if (!errorWorkspaceId) {
logger.warn(
`[${requestId}] Cannot log error: workflow ${payload.workflowId} has no workspace`
)
throw error
}
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // May not be available for early errors
workspaceId: errorWorkspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,

View File

@@ -59,7 +59,10 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
}
const actorUserId = preprocessResult.actorUserId!
const workspaceId = preprocessResult.workflowRecord?.workspaceId || undefined
const workspaceId = preprocessResult.workflowRecord?.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${workflowId} has no associated workspace`)
}
logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`)

View File

@@ -67,6 +67,7 @@ export {
SModalSidebarSectionTitle,
SModalTrigger,
} from './s-modal/s-modal'
export { Slider, type SliderProps } from './slider/slider'
export { Switch } from './switch/switch'
export { Textarea } from './textarea/textarea'
export { Tooltip } from './tooltip/tooltip'

View File

@@ -0,0 +1,39 @@
'use client'
import * as React from 'react'
import * as SliderPrimitive from '@radix-ui/react-slider'
import { cn } from '@/lib/core/utils/cn'
export interface SliderProps extends React.ComponentPropsWithoutRef<typeof SliderPrimitive.Root> {}
/**
* EMCN Slider component built on Radix UI Slider primitive.
* Styled to match the Switch component with thin track design.
*
* @example
* ```tsx
* <Slider value={[50]} onValueChange={setValue} min={0} max={100} step={10} />
* ```
*/
const Slider = React.forwardRef<React.ElementRef<typeof SliderPrimitive.Root>, SliderProps>(
({ className, ...props }, ref) => (
<SliderPrimitive.Root
ref={ref}
className={cn(
'relative flex w-full touch-none select-none items-center',
'disabled:cursor-not-allowed disabled:opacity-50',
className
)}
{...props}
>
<SliderPrimitive.Track className='relative h-[6px] w-full grow overflow-hidden rounded-[20px] bg-[var(--surface-12)] transition-colors'>
<SliderPrimitive.Range className='absolute h-full bg-[var(--surface-12)]' />
</SliderPrimitive.Track>
<SliderPrimitive.Thumb className='block h-[14px] w-[14px] cursor-pointer rounded-full bg-[var(--text-primary)] shadow-sm transition-colors focus-visible:outline-none' />
</SliderPrimitive.Root>
)
)
Slider.displayName = SliderPrimitive.Root.displayName
export { Slider }

View File

@@ -94,12 +94,19 @@ export function serializePauseSnapshot(
dagIncomingEdges,
}
const workspaceId = metadataFromContext?.workspaceId ?? context.workspaceId
if (!workspaceId) {
throw new Error(
`Cannot serialize pause snapshot: missing workspaceId for workflow ${context.workflowId}`
)
}
const executionMetadata: ExecutionMetadata = {
requestId:
metadataFromContext?.requestId ?? context.executionId ?? context.workflowId ?? 'unknown',
executionId: context.executionId ?? 'unknown',
workflowId: context.workflowId,
workspaceId: context.workspaceId,
workspaceId,
userId: metadataFromContext?.userId ?? '',
sessionUserId: metadataFromContext?.sessionUserId,
workflowUserId: metadataFromContext?.workflowUserId,

View File

@@ -5,7 +5,7 @@ export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId?: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string

View File

@@ -1,4 +1,3 @@
import { useEffect } from 'react'
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
import { syncThemeToNextThemes } from '@/lib/core/utils/theme'
import { createLogger } from '@/lib/logs/console/logger'
@@ -25,6 +24,7 @@ export interface GeneralSettings {
telemetryEnabled: boolean
billingUsageNotificationsEnabled: boolean
errorNotificationsEnabled: boolean
snapToGridSize: number
}
/**
@@ -49,18 +49,20 @@ async function fetchGeneralSettings(): Promise<GeneralSettings> {
telemetryEnabled: data.telemetryEnabled ?? true,
billingUsageNotificationsEnabled: data.billingUsageNotificationsEnabled ?? true,
errorNotificationsEnabled: data.errorNotificationsEnabled ?? true,
snapToGridSize: data.snapToGridSize ?? 0,
}
}
/**
* Sync React Query cache to Zustand store and next-themes.
* This ensures the rest of the app (which uses Zustand) stays in sync.
* Uses shallow comparison to prevent unnecessary updates and flickering.
* @param settings - The general settings to sync
*/
function syncSettingsToZustand(settings: GeneralSettings) {
const { setSettings } = useGeneralStore.getState()
const store = useGeneralStore.getState()
setSettings({
const newSettings = {
isAutoConnectEnabled: settings.autoConnect,
showTrainingControls: settings.showTrainingControls,
superUserModeEnabled: settings.superUserModeEnabled,
@@ -68,30 +70,35 @@ function syncSettingsToZustand(settings: GeneralSettings) {
telemetryEnabled: settings.telemetryEnabled,
isBillingUsageNotificationsEnabled: settings.billingUsageNotificationsEnabled,
isErrorNotificationsEnabled: settings.errorNotificationsEnabled,
})
snapToGridSize: settings.snapToGridSize,
}
const hasChanges = Object.entries(newSettings).some(
([key, value]) => store[key as keyof typeof newSettings] !== value
)
if (hasChanges) {
store.setSettings(newSettings)
}
syncThemeToNextThemes(settings.theme)
}
/**
* Hook to fetch general settings.
* Also syncs to Zustand store to keep the rest of the app in sync.
* Syncs to Zustand store only on successful fetch (not on cache updates from mutations).
*/
export function useGeneralSettings() {
const query = useQuery({
return useQuery({
queryKey: generalSettingsKeys.settings(),
queryFn: fetchGeneralSettings,
queryFn: async () => {
const settings = await fetchGeneralSettings()
syncSettingsToZustand(settings)
return settings
},
staleTime: 60 * 60 * 1000,
placeholderData: keepPreviousData,
})
useEffect(() => {
if (query.data) {
syncSettingsToZustand(query.data)
}
}, [query.data])
return query
}
/**
@@ -131,8 +138,8 @@ export function useUpdateGeneralSetting() {
...previousSettings,
[key]: value,
}
queryClient.setQueryData<GeneralSettings>(generalSettingsKeys.settings(), newSettings)
queryClient.setQueryData<GeneralSettings>(generalSettingsKeys.settings(), newSettings)
syncSettingsToZustand(newSettings)
}
@@ -145,8 +152,5 @@ export function useUpdateGeneralSetting() {
}
logger.error('Failed to update setting:', err)
},
onSuccess: (_data, _variables, _context) => {
queryClient.invalidateQueries({ queryKey: generalSettingsKeys.settings() })
},
})
}

View File

@@ -0,0 +1,421 @@
import { db } from '@sim/db'
import { usageLog, workflow } from '@sim/db/schema'
import { and, desc, eq, gte, lte, sql } from 'drizzle-orm'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('UsageLog')
/**
* Usage log category types
*/
export type UsageLogCategory = 'model' | 'fixed'
/**
* Usage log source types
*/
export type UsageLogSource = 'workflow' | 'wand' | 'copilot'
/**
* Metadata for 'model' category charges
*/
export interface ModelUsageMetadata {
inputTokens: number
outputTokens: number
}
/**
* Metadata for 'fixed' category charges (currently empty, extensible)
*/
export type FixedUsageMetadata = Record<string, never>
/**
* Union type for all metadata types
*/
export type UsageLogMetadata = ModelUsageMetadata | FixedUsageMetadata | null
/**
* Parameters for logging model usage (token-based charges)
*/
export interface LogModelUsageParams {
userId: string
source: UsageLogSource
model: string
inputTokens: number
outputTokens: number
cost: number
workspaceId?: string
workflowId?: string
executionId?: string
}
/**
* Parameters for logging fixed charges (flat fees)
*/
export interface LogFixedUsageParams {
userId: string
source: UsageLogSource
description: string
cost: number
workspaceId?: string
workflowId?: string
executionId?: string
}
/**
* Log a model usage charge (token-based)
*/
export async function logModelUsage(params: LogModelUsageParams): Promise<void> {
if (!isBillingEnabled) {
return
}
try {
const metadata: ModelUsageMetadata = {
inputTokens: params.inputTokens,
outputTokens: params.outputTokens,
}
await db.insert(usageLog).values({
id: crypto.randomUUID(),
userId: params.userId,
category: 'model',
source: params.source,
description: params.model,
metadata,
cost: params.cost.toString(),
workspaceId: params.workspaceId ?? null,
workflowId: params.workflowId ?? null,
executionId: params.executionId ?? null,
})
logger.debug('Logged model usage', {
userId: params.userId,
source: params.source,
model: params.model,
cost: params.cost,
})
} catch (error) {
logger.error('Failed to log model usage', {
error: error instanceof Error ? error.message : String(error),
params,
})
// Don't throw - usage logging should not break the main flow
}
}
/**
* Log a fixed charge (flat fee like base execution charge or search)
*/
export async function logFixedUsage(params: LogFixedUsageParams): Promise<void> {
if (!isBillingEnabled) {
return
}
try {
await db.insert(usageLog).values({
id: crypto.randomUUID(),
userId: params.userId,
category: 'fixed',
source: params.source,
description: params.description,
metadata: null,
cost: params.cost.toString(),
workspaceId: params.workspaceId ?? null,
workflowId: params.workflowId ?? null,
executionId: params.executionId ?? null,
})
logger.debug('Logged fixed usage', {
userId: params.userId,
source: params.source,
description: params.description,
cost: params.cost,
})
} catch (error) {
logger.error('Failed to log fixed usage', {
error: error instanceof Error ? error.message : String(error),
params,
})
// Don't throw - usage logging should not break the main flow
}
}
/**
* Parameters for batch logging workflow usage
*/
export interface LogWorkflowUsageBatchParams {
userId: string
workspaceId?: string
workflowId: string
executionId?: string
baseExecutionCharge?: number
models?: Record<
string,
{
total: number
tokens: { prompt: number; completion: number }
}
>
}
/**
* Log all workflow usage entries in a single batch insert (performance optimized)
*/
export async function logWorkflowUsageBatch(params: LogWorkflowUsageBatchParams): Promise<void> {
if (!isBillingEnabled) {
return
}
const entries: Array<{
id: string
userId: string
category: 'model' | 'fixed'
source: 'workflow'
description: string
metadata: ModelUsageMetadata | null
cost: string
workspaceId: string | null
workflowId: string | null
executionId: string | null
}> = []
if (params.baseExecutionCharge && params.baseExecutionCharge > 0) {
entries.push({
id: crypto.randomUUID(),
userId: params.userId,
category: 'fixed',
source: 'workflow',
description: 'execution_fee',
metadata: null,
cost: params.baseExecutionCharge.toString(),
workspaceId: params.workspaceId ?? null,
workflowId: params.workflowId,
executionId: params.executionId ?? null,
})
}
if (params.models) {
for (const [modelName, modelData] of Object.entries(params.models)) {
if (modelData.total > 0) {
entries.push({
id: crypto.randomUUID(),
userId: params.userId,
category: 'model',
source: 'workflow',
description: modelName,
metadata: {
inputTokens: modelData.tokens.prompt,
outputTokens: modelData.tokens.completion,
},
cost: modelData.total.toString(),
workspaceId: params.workspaceId ?? null,
workflowId: params.workflowId,
executionId: params.executionId ?? null,
})
}
}
}
if (entries.length === 0) {
return
}
try {
await db.insert(usageLog).values(entries)
logger.debug('Logged workflow usage batch', {
userId: params.userId,
workflowId: params.workflowId,
entryCount: entries.length,
})
} catch (error) {
logger.error('Failed to log workflow usage batch', {
error: error instanceof Error ? error.message : String(error),
params,
})
// Don't throw - usage logging should not break the main flow
}
}
/**
* Options for querying usage logs
*/
export interface GetUsageLogsOptions {
/** Filter by source */
source?: UsageLogSource
/** Filter by workspace */
workspaceId?: string
/** Start date (inclusive) */
startDate?: Date
/** End date (inclusive) */
endDate?: Date
/** Maximum number of results */
limit?: number
/** Cursor for pagination (log ID) */
cursor?: string
}
/**
* Usage log entry returned from queries
*/
export interface UsageLogEntry {
id: string
createdAt: string
category: UsageLogCategory
source: UsageLogSource
description: string
metadata?: UsageLogMetadata
cost: number
workspaceId?: string
workflowId?: string
executionId?: string
}
/**
* Result from getUserUsageLogs
*/
export interface UsageLogsResult {
logs: UsageLogEntry[]
summary: {
totalCost: number
bySource: Record<string, number>
}
pagination: {
nextCursor?: string
hasMore: boolean
}
}
/**
* Get usage logs for a user with optional filtering and pagination
*/
export async function getUserUsageLogs(
userId: string,
options: GetUsageLogsOptions = {}
): Promise<UsageLogsResult> {
const { source, workspaceId, startDate, endDate, limit = 50, cursor } = options
try {
const conditions = [eq(usageLog.userId, userId)]
if (source) {
conditions.push(eq(usageLog.source, source))
}
if (workspaceId) {
conditions.push(eq(usageLog.workspaceId, workspaceId))
}
if (startDate) {
conditions.push(gte(usageLog.createdAt, startDate))
}
if (endDate) {
conditions.push(lte(usageLog.createdAt, endDate))
}
if (cursor) {
const cursorLog = await db
.select({ createdAt: usageLog.createdAt })
.from(usageLog)
.where(eq(usageLog.id, cursor))
.limit(1)
if (cursorLog.length > 0) {
conditions.push(
sql`(${usageLog.createdAt} < ${cursorLog[0].createdAt} OR (${usageLog.createdAt} = ${cursorLog[0].createdAt} AND ${usageLog.id} < ${cursor}))`
)
}
}
const logs = await db
.select()
.from(usageLog)
.where(and(...conditions))
.orderBy(desc(usageLog.createdAt), desc(usageLog.id))
.limit(limit + 1)
const hasMore = logs.length > limit
const resultLogs = hasMore ? logs.slice(0, limit) : logs
const transformedLogs: UsageLogEntry[] = resultLogs.map((log) => ({
id: log.id,
createdAt: log.createdAt.toISOString(),
category: log.category as UsageLogCategory,
source: log.source as UsageLogSource,
description: log.description,
...(log.metadata ? { metadata: log.metadata as UsageLogMetadata } : {}),
cost: Number.parseFloat(log.cost),
...(log.workspaceId ? { workspaceId: log.workspaceId } : {}),
...(log.workflowId ? { workflowId: log.workflowId } : {}),
...(log.executionId ? { executionId: log.executionId } : {}),
}))
const summaryConditions = [eq(usageLog.userId, userId)]
if (source) summaryConditions.push(eq(usageLog.source, source))
if (workspaceId) summaryConditions.push(eq(usageLog.workspaceId, workspaceId))
if (startDate) summaryConditions.push(gte(usageLog.createdAt, startDate))
if (endDate) summaryConditions.push(lte(usageLog.createdAt, endDate))
const summaryResult = await db
.select({
source: usageLog.source,
totalCost: sql<string>`SUM(${usageLog.cost})`,
})
.from(usageLog)
.where(and(...summaryConditions))
.groupBy(usageLog.source)
const bySource: Record<string, number> = {}
let totalCost = 0
for (const row of summaryResult) {
const sourceCost = Number.parseFloat(row.totalCost || '0')
bySource[row.source] = sourceCost
totalCost += sourceCost
}
return {
logs: transformedLogs,
summary: {
totalCost,
bySource,
},
pagination: {
nextCursor:
hasMore && resultLogs.length > 0 ? resultLogs[resultLogs.length - 1].id : undefined,
hasMore,
},
}
} catch (error) {
logger.error('Failed to get usage logs', {
error: error instanceof Error ? error.message : String(error),
userId,
options,
})
throw error
}
}
/**
* Get the user ID associated with a workflow
* Helper function for cases where we only have a workflow ID
*/
export async function getUserIdFromWorkflow(workflowId: string): Promise<string | null> {
try {
const [workflowRecord] = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
return workflowRecord?.userId ?? null
} catch (error) {
logger.error('Failed to get user ID from workflow', {
error: error instanceof Error ? error.message : String(error),
workflowId,
})
return null
}
}

View File

@@ -516,6 +516,15 @@ async function logPreprocessingError(params: {
loggingSession,
} = params
if (!workspaceId) {
logger.warn(`[${requestId}] Cannot log preprocessing error: no workspaceId available`, {
workflowId,
executionId,
errorMessage,
})
return
}
try {
const session =
loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId)

View File

@@ -14,6 +14,7 @@ import {
getOrgUsageLimit,
maybeSendUsageThresholdEmail,
} from '@/lib/billing/core/usage'
import { logWorkflowUsageBatch } from '@/lib/billing/core/usage-log'
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { redactApiKeys } from '@/lib/core/security/redaction'
@@ -46,43 +47,6 @@ export interface ToolCall {
const logger = createLogger('ExecutionLogger')
export class ExecutionLogger implements IExecutionLoggerService {
private mergeTraceSpans(existing: TraceSpan[], additional: TraceSpan[]): TraceSpan[] {
// If no existing spans, just return additional
if (!existing || existing.length === 0) return additional
if (!additional || additional.length === 0) return existing
// Find the root "Workflow Execution" span in both arrays
const existingRoot = existing.find((s) => s.name === 'Workflow Execution')
const additionalRoot = additional.find((s) => s.name === 'Workflow Execution')
if (!existingRoot || !additionalRoot) {
// If we can't find both roots, just concatenate (fallback)
return [...existing, ...additional]
}
// Calculate the full duration from original start to resume end
const startTime = existingRoot.startTime
const endTime = additionalRoot.endTime || existingRoot.endTime
const fullDuration =
startTime && endTime
? new Date(endTime).getTime() - new Date(startTime).getTime()
: (existingRoot.duration || 0) + (additionalRoot.duration || 0)
// Merge the children of the workflow execution spans
const mergedRoot = {
...existingRoot,
children: [...(existingRoot.children || []), ...(additionalRoot.children || [])],
endTime,
duration: fullDuration,
}
// Return array with merged root plus any other top-level spans
const otherExisting = existing.filter((s) => s.name !== 'Workflow Execution')
const otherAdditional = additional.filter((s) => s.name !== 'Workflow Execution')
return [mergedRoot, ...otherExisting, ...otherAdditional]
}
private mergeCostModels(
existing: Record<string, any>,
additional: Record<string, any>
@@ -109,6 +73,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
async startWorkflowExecution(params: {
workflowId: string
workspaceId: string
executionId: string
trigger: ExecutionTrigger
environment: ExecutionEnvironment
@@ -118,8 +83,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
workflowLog: WorkflowExecutionLog
snapshot: WorkflowExecutionSnapshot
}> {
const { workflowId, executionId, trigger, environment, workflowState, deploymentVersionId } =
params
const {
workflowId,
workspaceId,
executionId,
trigger,
environment,
workflowState,
deploymentVersionId,
} = params
logger.debug(`Starting workflow execution ${executionId} for workflow ${workflowId}`)
@@ -168,6 +140,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
.values({
id: uuidv4(),
workflowId,
workspaceId,
executionId,
stateSnapshotId: snapshotResult.snapshot.id,
deploymentVersionId: deploymentVersionId ?? null,
@@ -374,7 +347,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type']
updatedLog.trigger as ExecutionTrigger['type'],
executionId
)
const limit = before.usageData.limit
@@ -411,7 +385,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type']
updatedLog.trigger as ExecutionTrigger['type'],
executionId
)
const percentBefore =
@@ -436,14 +411,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type']
updatedLog.trigger as ExecutionTrigger['type'],
executionId
)
}
} else {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type']
updatedLog.trigger as ExecutionTrigger['type'],
executionId
)
}
} catch (e) {
@@ -451,7 +428,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
await this.updateUserStats(
updatedLog.workflowId,
costSummary,
updatedLog.trigger as ExecutionTrigger['type']
updatedLog.trigger as ExecutionTrigger['type'],
executionId
)
} catch {}
logger.warn('Usage threshold notification check failed (non-fatal)', { error: e })
@@ -524,8 +502,18 @@ export class ExecutionLogger implements IExecutionLoggerService {
totalCompletionTokens: number
baseExecutionCharge: number
modelCost: number
models?: Record<
string,
{
input: number
output: number
total: number
tokens: { prompt: number; completion: number; total: number }
}
>
},
trigger: ExecutionTrigger['type']
trigger: ExecutionTrigger['type'],
executionId?: string
): Promise<void> {
if (!isBillingEnabled) {
logger.debug('Billing is disabled, skipping user stats cost update')
@@ -597,6 +585,16 @@ export class ExecutionLogger implements IExecutionLoggerService {
addedTokens: costSummary.totalTokens,
})
// Log usage entries for auditing (batch insert for performance)
await logWorkflowUsageBatch({
userId,
workspaceId: workflowRecord.workspaceId ?? undefined,
workflowId,
executionId,
baseExecutionCharge: costSummary.baseExecutionCharge,
models: costSummary.models,
})
// Check if user has hit overage threshold and bill incrementally
await checkAndBillOverageThreshold(userId)
} catch (error) {

View File

@@ -18,7 +18,7 @@ const logger = createLogger('LoggingSession')
export interface SessionStartParams {
userId?: string
workspaceId?: string
workspaceId: string
variables?: Record<string, string>
triggerData?: Record<string, unknown>
skipLogCreation?: boolean // For resume executions - reuse existing log entry
@@ -65,7 +65,7 @@ export class LoggingSession {
this.requestId = requestId
}
async start(params: SessionStartParams = {}): Promise<void> {
async start(params: SessionStartParams): Promise<void> {
const { userId, workspaceId, variables, triggerData, skipLogCreation, deploymentVersionId } =
params
@@ -84,6 +84,7 @@ export class LoggingSession {
if (!skipLogCreation) {
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
workspaceId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,
@@ -115,7 +116,6 @@ export class LoggingSession {
* Note: Logging now works through trace spans only, no direct executor integration needed
*/
setupExecutor(executor: any): void {
// No longer setting logger on executor - trace spans handle everything
if (this.requestId) {
logger.debug(`[${this.requestId}] Logging session ready for execution ${this.executionId}`)
}
@@ -272,7 +272,7 @@ export class LoggingSession {
}
}
async safeStart(params: SessionStartParams = {}): Promise<boolean> {
async safeStart(params: SessionStartParams): Promise<boolean> {
try {
await this.start(params)
return true
@@ -305,6 +305,7 @@ export class LoggingSession {
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
workspaceId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,

View File

@@ -334,6 +334,7 @@ export interface SnapshotCreationResult {
export interface ExecutionLoggerService {
startWorkflowExecution(params: {
workflowId: string
workspaceId: string
executionId: string
trigger: ExecutionTrigger
environment: ExecutionEnvironment

View File

@@ -103,6 +103,9 @@ export async function executeWorkflowCore(
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
throw new Error(`Execution metadata missing workspaceId for workflow ${workflowId}`)
}
let processedInput = input || {}

View File

@@ -533,7 +533,11 @@ function createStreamingMessage(): CopilotMessage {
}
}
function createErrorMessage(messageId: string, content: string): CopilotMessage {
function createErrorMessage(
messageId: string,
content: string,
errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required'
): CopilotMessage {
return {
id: messageId,
role: 'assistant',
@@ -546,6 +550,7 @@ function createErrorMessage(messageId: string, content: string): CopilotMessage
timestamp: Date.now(),
},
],
errorType,
}
}
@@ -2066,23 +2071,35 @@ export const useCopilotStore = create<CopilotStore>()(
// Check for specific status codes and provide custom messages
let errorContent = result.error || 'Failed to send message'
let errorType:
| 'usage_limit'
| 'unauthorized'
| 'forbidden'
| 'rate_limit'
| 'upgrade_required'
| undefined
if (result.status === 401) {
errorContent =
'_Unauthorized request. You need a valid API key to use the copilot. You can get one by going to [sim.ai](https://sim.ai) settings and generating one there._'
errorType = 'unauthorized'
} else if (result.status === 402) {
errorContent =
'_Usage limit exceeded. To continue using this service, upgrade your plan or top up on credits._'
'_Usage limit exceeded. To continue using this service, upgrade your plan or increase your usage limit to:_'
errorType = 'usage_limit'
} else if (result.status === 403) {
errorContent =
'_Provider config not allowed for non-enterprise users. Please remove the provider config and try again_'
errorType = 'forbidden'
} else if (result.status === 426) {
errorContent =
'_Please upgrade to the latest version of the Sim platform to continue using the copilot._'
errorType = 'upgrade_required'
} else if (result.status === 429) {
errorContent = '_Provider rate limit exceeded. Please try again later._'
errorType = 'rate_limit'
}
const errorMessage = createErrorMessage(streamingMessage.id, errorContent)
const errorMessage = createErrorMessage(streamingMessage.id, errorContent, errorType)
set((state) => ({
messages: state.messages.map((m) => (m.id === streamingMessage.id ? errorMessage : m)),
error: errorContent,

View File

@@ -39,6 +39,7 @@ export interface CopilotMessage {
>
fileAttachments?: MessageFileAttachment[]
contexts?: ChatContext[]
errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required'
}
// Contexts attached to a user message

View File

@@ -13,6 +13,7 @@ const initialState: General = {
telemetryEnabled: true,
isBillingUsageNotificationsEnabled: true,
isErrorNotificationsEnabled: true,
snapToGridSize: 0,
}
export const useGeneralStore = create<GeneralStore>()(

View File

@@ -6,6 +6,7 @@ export interface General {
telemetryEnabled: boolean
isBillingUsageNotificationsEnabled: boolean
isErrorNotificationsEnabled: boolean
snapToGridSize: number
}
export interface GeneralStore extends General {
@@ -21,4 +22,5 @@ export type UserSettings = {
telemetryEnabled: boolean
isBillingUsageNotificationsEnabled: boolean
errorNotificationsEnabled: boolean
snapToGridSize: number
}

View File

@@ -0,0 +1,31 @@
-- Step 1: Add column as NULLABLE first (instant, no lock)
ALTER TABLE "workflow_execution_logs" ADD COLUMN IF NOT EXISTS "workspace_id" text;--> statement-breakpoint
-- Step 2: Backfill workspace_id from workflow table
UPDATE "workflow_execution_logs" wel
SET "workspace_id" = w."workspace_id"
FROM "workflow" w
WHERE wel."workflow_id" = w."id"
AND w."workspace_id" IS NOT NULL
AND wel."workspace_id" IS NULL;--> statement-breakpoint
-- Step 3: Delete orphaned execution logs (from workflows without workspaces)
DELETE FROM "workflow_execution_logs"
WHERE "workspace_id" IS NULL;--> statement-breakpoint
-- Step 4: Add NOT NULL constraint (safe now - all remaining rows have values)
ALTER TABLE "workflow_execution_logs" ALTER COLUMN "workspace_id" SET NOT NULL;--> statement-breakpoint
-- Step 5: Add foreign key constraint
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'workflow_execution_logs_workspace_id_workspace_id_fk'
) THEN
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_workspace_id_workspace_id_fk"
FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;
END IF;
END $$;--> statement-breakpoint
-- Step 6: Create indexes
CREATE INDEX IF NOT EXISTS "workflow_execution_logs_workspace_started_at_idx" ON "workflow_execution_logs" USING btree ("workspace_id","started_at");

View File

@@ -0,0 +1,23 @@
CREATE TYPE "public"."usage_log_category" AS ENUM('model', 'fixed');--> statement-breakpoint
CREATE TYPE "public"."usage_log_source" AS ENUM('workflow', 'wand', 'copilot');--> statement-breakpoint
CREATE TABLE "usage_log" (
"id" text PRIMARY KEY NOT NULL,
"user_id" text NOT NULL,
"category" "usage_log_category" NOT NULL,
"source" "usage_log_source" NOT NULL,
"description" text NOT NULL,
"metadata" jsonb,
"cost" numeric NOT NULL,
"workspace_id" text,
"workflow_id" text,
"execution_id" text,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "usage_log" ADD CONSTRAINT "usage_log_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "usage_log_user_created_at_idx" ON "usage_log" USING btree ("user_id","created_at");--> statement-breakpoint
CREATE INDEX "usage_log_source_idx" ON "usage_log" USING btree ("source");--> statement-breakpoint
CREATE INDEX "usage_log_workspace_id_idx" ON "usage_log" USING btree ("workspace_id");--> statement-breakpoint
CREATE INDEX "usage_log_workflow_id_idx" ON "usage_log" USING btree ("workflow_id");

View File

@@ -0,0 +1 @@
ALTER TABLE "settings" ADD COLUMN "snap_to_grid_size" integer DEFAULT 0 NOT NULL;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -883,6 +883,27 @@
"when": 1766203036010,
"tag": "0126_dapper_midnight",
"breakpoints": true
},
{
"idx": 127,
"version": "7",
"when": 1766209394504,
"tag": "0127_flimsy_sister_grimm",
"breakpoints": true
},
{
"idx": 128,
"version": "7",
"when": 1766266581373,
"tag": "0128_swift_terrax",
"breakpoints": true
},
{
"idx": 129,
"version": "7",
"when": 1766275541149,
"tag": "0129_stormy_nightmare",
"breakpoints": true
}
]
}

View File

@@ -292,6 +292,9 @@ export const workflowExecutionLogs = pgTable(
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workspaceId: text('workspace_id')
.notNull()
.references(() => workspace.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
stateSnapshotId: text('state_snapshot_id')
.notNull()
@@ -327,11 +330,14 @@ export const workflowExecutionLogs = pgTable(
executionIdUnique: uniqueIndex('workflow_execution_logs_execution_id_unique').on(
table.executionId
),
// Composite index for the new join-based query pattern
workflowStartedAtIdx: index('workflow_execution_logs_workflow_started_at_idx').on(
table.workflowId,
table.startedAt
),
workspaceStartedAtIdx: index('workflow_execution_logs_workspace_started_at_idx').on(
table.workspaceId,
table.startedAt
),
})
)
@@ -442,6 +448,9 @@ export const settings = pgTable('settings', {
// Notification preferences
errorNotificationsEnabled: boolean('error_notifications_enabled').notNull().default(true),
// Canvas preferences
snapToGridSize: integer('snap_to_grid_size').notNull().default(0), // 0 = off, 10-50 = grid size
// Copilot preferences - maps model_id to enabled/disabled boolean
copilotEnabledModels: jsonb('copilot_enabled_models').notNull().default('{}'),
@@ -1658,3 +1667,51 @@ export const ssoProvider = pgTable(
organizationIdIdx: index('sso_provider_organization_id_idx').on(table.organizationId),
})
)
// Usage logging for tracking individual billable operations
export const usageLogCategoryEnum = pgEnum('usage_log_category', ['model', 'fixed'])
export const usageLogSourceEnum = pgEnum('usage_log_source', ['workflow', 'wand', 'copilot'])
export const usageLog = pgTable(
'usage_log',
{
id: text('id').primaryKey(),
userId: text('user_id')
.notNull()
.references(() => user.id, { onDelete: 'cascade' }),
// Charge category: 'model' (token-based) or 'fixed' (flat fee)
category: usageLogCategoryEnum('category').notNull(),
// What generated this charge: 'workflow', 'wand', 'copilot'
source: usageLogSourceEnum('source').notNull(),
// For model charges: model name (e.g., 'gpt-4o', 'claude-4.5-opus')
// For fixed charges: charge type (e.g., 'execution_fee', 'search_query')
description: text('description').notNull(),
// Category-specific metadata (e.g., tokens for 'model' category)
metadata: jsonb('metadata'),
// Cost in USD
cost: decimal('cost').notNull(),
// Optional context references
workspaceId: text('workspace_id').references(() => workspace.id, { onDelete: 'set null' }),
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
executionId: text('execution_id'),
// Timestamp
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
// Index for querying user's usage history (most common query)
userCreatedAtIdx: index('usage_log_user_created_at_idx').on(table.userId, table.createdAt),
// Index for filtering by source
sourceIdx: index('usage_log_source_idx').on(table.source),
// Index for workspace-specific queries
workspaceIdIdx: index('usage_log_workspace_id_idx').on(table.workspaceId),
// Index for workflow-specific queries
workflowIdIdx: index('usage_log_workflow_id_idx').on(table.workflowId),
})
)