Compare commits

..

8 Commits

Author SHA1 Message Date
Waleed Latif
4846f6c60d v0.3.37: azure OCR api key, wand SSE, CRON helm 2025-08-22 14:54:36 -07:00
Vikhyath Mondreti
be810013c7 feat(native-bg-tasks): support webhooks and async workflow executions without trigger.dev (#1106)
* feat(native-bg-tasks): support webhooks and async workflow executions without trigger"

* fix tests

* fix env var defaults and revert async workflow execution to always use trigger

* fix UI for hiding async

* hide entire toggle
2025-08-22 14:43:21 -07:00
Waleed Latif
1ee4263e60 feat(helm): added CRON jobs to helm charts (#1107) 2025-08-22 14:29:44 -07:00
Waleed Latif
60c4668682 fix(naming): prevent identical normalized block names (#1105) 2025-08-22 13:20:45 -07:00
Emir Karabeg
a268fb7c04 fix(chat-deploy): dark mode ui (#1101) 2025-08-22 12:23:11 -07:00
Waleed Latif
6c606750f5 improvement(signup): modify signup and login pages to not show social sign in when not configured, increase logo size (#1103) 2025-08-22 12:15:59 -07:00
Waleed Latif
e13adab14f improvement(wand): upgrade wand to use SSE (#1100)
* improvement(wand): upgrade wand to use SSE

* fix(ocr-azure): added OCR_AZURE_API_KEY envvar (#1102)

* make wand identical to chat panel
2025-08-22 12:01:16 -07:00
Waleed Latif
44bc12b474 fix(ocr-azure): added OCR_AZURE_API_KEY envvar (#1102) 2025-08-22 11:49:56 -07:00
26 changed files with 1319 additions and 765 deletions

View File

@@ -3,7 +3,6 @@
import { useEffect, useState } from 'react'
import { GithubIcon, GoogleIcon } from '@/components/icons'
import { Button } from '@/components/ui/button'
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'
import { client } from '@/lib/auth-client'
interface SocialLoginButtonsProps {
@@ -114,58 +113,16 @@ export function SocialLoginButtons({
</Button>
)
const renderGithubButton = () => {
if (githubAvailable) return githubButton
const hasAnyOAuthProvider = githubAvailable || googleAvailable
return (
<TooltipProvider>
<Tooltip>
<TooltipTrigger asChild>
<div>{githubButton}</div>
</TooltipTrigger>
<TooltipContent className='border-neutral-700 bg-neutral-800 text-white'>
<p>
GitHub login requires OAuth credentials to be configured. Add the following
environment variables:
</p>
<ul className='mt-2 space-y-1 text-neutral-300 text-xs'>
<li> GITHUB_CLIENT_ID</li>
<li> GITHUB_CLIENT_SECRET</li>
</ul>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)
}
const renderGoogleButton = () => {
if (googleAvailable) return googleButton
return (
<TooltipProvider>
<Tooltip>
<TooltipTrigger asChild>
<div>{googleButton}</div>
</TooltipTrigger>
<TooltipContent className='border-neutral-700 bg-neutral-800 text-white'>
<p>
Google login requires OAuth credentials to be configured. Add the following
environment variables:
</p>
<ul className='mt-2 space-y-1 text-neutral-300 text-xs'>
<li> GOOGLE_CLIENT_ID</li>
<li> GOOGLE_CLIENT_SECRET</li>
</ul>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)
if (!hasAnyOAuthProvider) {
return null
}
return (
<div className='grid gap-3'>
{renderGithubButton()}
{renderGoogleButton()}
{githubAvailable && githubButton}
{googleAvailable && googleButton}
</div>
)
}

View File

@@ -28,12 +28,12 @@ export default function AuthLayout({ children }: { children: React.ReactNode })
<img
src={brand.logoUrl}
alt={`${brand.name} Logo`}
width={42}
height={42}
className='h-[42px] w-[42px] object-contain'
width={56}
height={56}
className='h-[56px] w-[56px] object-contain'
/>
) : (
<Image src='/sim.svg' alt={`${brand.name} Logo`} width={42} height={42} />
<Image src='/sim.svg' alt={`${brand.name} Logo`} width={56} height={56} />
)}
</Link>
</div>

View File

@@ -366,11 +366,13 @@ export default function LoginPage({
callbackURL={callbackUrl}
/>
<div className='relative mt-2 py-4'>
<div className='absolute inset-0 flex items-center'>
<div className='w-full border-neutral-700/50 border-t' />
{(githubAvailable || googleAvailable) && (
<div className='relative mt-2 py-4'>
<div className='absolute inset-0 flex items-center'>
<div className='w-full border-neutral-700/50 border-t' />
</div>
</div>
</div>
)}
<form onSubmit={onSubmit} className='space-y-5'>
<div className='space-y-4'>

View File

@@ -381,11 +381,13 @@ function SignupFormContent({
isProduction={isProduction}
/>
<div className='relative mt-2 py-4'>
<div className='absolute inset-0 flex items-center'>
<div className='w-full border-neutral-700/50 border-t' />
{(githubAvailable || googleAvailable) && (
<div className='relative mt-2 py-4'>
<div className='absolute inset-0 flex items-center'>
<div className='w-full border-neutral-700/50 border-t' />
</div>
</div>
</div>
)}
<form onSubmit={onSubmit} className='space-y-5'>
<div className='space-y-4'>

View File

@@ -354,6 +354,18 @@ export function mockExecutionDependencies() {
}))
}
/**
* Mock Trigger.dev SDK (tasks.trigger and task factory) for tests that import background modules
*/
export function mockTriggerDevSdk() {
vi.mock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }),
},
task: vi.fn().mockReturnValue({}),
}))
}
export function mockWorkflowAccessValidation(shouldSucceed = true) {
if (shouldSucceed) {
vi.mock('@/app/api/workflows/middleware', () => ({

View File

@@ -95,12 +95,19 @@ export async function POST(req: NextRequest) {
{
stream,
historyLength: history.length,
endpoint: useWandAzure ? azureEndpoint : 'api.openai.com',
model: useWandAzure ? wandModelName : 'gpt-4o',
apiVersion: useWandAzure ? azureApiVersion : 'N/A',
}
)
// For streaming responses
if (stream) {
try {
logger.debug(
`[${requestId}] Starting streaming request to ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'}`
)
const streamCompletion = await client.chat.completions.create({
model: useWandAzure ? wandModelName : 'gpt-4o',
messages: messages,
@@ -109,6 +116,8 @@ export async function POST(req: NextRequest) {
stream: true,
})
logger.debug(`[${requestId}] Stream connection established successfully`)
return new Response(
new ReadableStream({
async start(controller) {
@@ -118,21 +127,23 @@ export async function POST(req: NextRequest) {
for await (const chunk of streamCompletion) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
// Use the same format as codegen API for consistency
// Use SSE format identical to chat streaming
controller.enqueue(
encoder.encode(`${JSON.stringify({ chunk: content, done: false })}\n`)
encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`)
)
}
}
// Send completion signal
controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: '', done: true })}\n`))
// Send completion signal in SSE format
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`))
controller.close()
logger.info(`[${requestId}] Wand generation streaming completed`)
} catch (streamError: any) {
logger.error(`[${requestId}] Streaming error`, { error: streamError.message })
controller.enqueue(
encoder.encode(`${JSON.stringify({ error: 'Streaming failed', done: true })}\n`)
encoder.encode(
`data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n`
)
)
controller.close()
}
@@ -140,9 +151,10 @@ export async function POST(req: NextRequest) {
}),
{
headers: {
'Content-Type': 'text/plain',
'Cache-Control': 'no-cache, no-transform',
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
},
}
)

View File

@@ -5,7 +5,22 @@ import { NextRequest } from 'next/server'
* @vitest-environment node
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createMockRequest, mockExecutionDependencies } from '@/app/api/__test-utils__/utils'
import {
createMockRequest,
mockExecutionDependencies,
mockTriggerDevSdk,
} from '@/app/api/__test-utils__/utils'
// Prefer mocking the background module to avoid loading Trigger.dev at all during tests
vi.mock('@/background/webhook-execution', () => ({
executeWebhookJob: vi.fn().mockResolvedValue({
success: true,
workflowId: 'test-workflow-id',
executionId: 'test-exec-id',
output: {},
executedAt: new Date().toISOString(),
}),
}))
const hasProcessedMessageMock = vi.fn().mockResolvedValue(false)
const markMessageAsProcessedMock = vi.fn().mockResolvedValue(true)
@@ -111,6 +126,7 @@ describe('Webhook Trigger API Route', () => {
vi.resetAllMocks()
mockExecutionDependencies()
mockTriggerDevSdk()
vi.doMock('@/services/queue', () => ({
RateLimiter: vi.fn().mockImplementation(() => ({
@@ -309,11 +325,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
const params = Promise.resolve({ path: 'test-path' })
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }),
},
}))
mockTriggerDevSdk()
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
const response = await POST(req, { params })
@@ -339,11 +351,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'bearer.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }),
},
}))
mockTriggerDevSdk()
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
const response = await POST(req, { params })
@@ -369,11 +377,7 @@ describe('Webhook Trigger API Route', () => {
const req = createMockRequest('POST', { event: 'custom.header.test' }, headers)
const params = Promise.resolve({ path: 'test-path' })
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
trigger: vi.fn().mockResolvedValue({ id: 'mock-task-id' }),
},
}))
mockTriggerDevSdk()
const { POST } = await import('@/app/api/webhooks/trigger/[path]/route')
const response = await POST(req, { params })

View File

@@ -2,12 +2,14 @@ import { tasks } from '@trigger.dev/sdk'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { env, isTruthy } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import {
handleSlackChallenge,
handleWhatsAppVerification,
validateMicrosoftTeamsSignature,
} from '@/lib/webhooks/utils'
import { executeWebhookJob } from '@/background/webhook-execution'
import { db } from '@/db'
import { subscription, webhook, workflow } from '@/db/schema'
import { RateLimiter } from '@/services/queue'
@@ -17,6 +19,7 @@ const logger = createLogger('WebhookTriggerAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 300
export const runtime = 'nodejs'
/**
* Webhook Verification Handler (GET)
@@ -330,10 +333,9 @@ export async function POST(
// Continue processing - better to risk usage limit bypass than fail webhook
}
// --- PHASE 5: Queue webhook execution via trigger.dev ---
// --- PHASE 5: Queue webhook execution (trigger.dev or direct based on env) ---
try {
// Queue the webhook execution task
const handle = await tasks.trigger('webhook-execution', {
const payload = {
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
userId: foundWorkflow.userId,
@@ -342,11 +344,24 @@ export async function POST(
headers: Object.fromEntries(request.headers.entries()),
path,
blockId: foundWebhook.blockId,
})
}
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
} else {
// Fire-and-forget direct execution to avoid blocking webhook response
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
}
// Return immediate acknowledgment with provider-specific format
if (foundWebhook.provider === 'microsoftteams') {

View File

@@ -540,7 +540,7 @@ export async function POST(
)
}
// Rate limit passed - trigger the task
// Rate limit passed - always use Trigger.dev for async executions
const handle = await tasks.trigger('workflow-execution', {
workflowId,
userId: authenticatedUserId,

View File

@@ -0,0 +1,167 @@
/* Force light mode for chat subdomain by overriding dark mode utilities */
/* This file uses CSS variables from globals.css light mode theme */
/* When inside the chat layout, force all light mode CSS variables */
.chat-light-wrapper {
/* Core Colors - from globals.css light mode */
--background: 0 0% 100%;
--foreground: 0 0% 3.9%;
/* Card Colors */
--card: 0 0% 99.2%;
--card-foreground: 0 0% 3.9%;
/* Popover Colors */
--popover: 0 0% 100%;
--popover-foreground: 0 0% 3.9%;
/* Primary Colors */
--primary: 0 0% 11.2%;
--primary-foreground: 0 0% 98%;
/* Secondary Colors */
--secondary: 0 0% 96.1%;
--secondary-foreground: 0 0% 11.2%;
/* Muted Colors */
--muted: 0 0% 96.1%;
--muted-foreground: 0 0% 46.9%;
/* Accent Colors */
--accent: 0 0% 92.5%;
--accent-foreground: 0 0% 11.2%;
/* Destructive Colors */
--destructive: 0 84.2% 60.2%;
--destructive-foreground: 0 0% 98%;
/* Border & Input Colors */
--border: 0 0% 89.8%;
--input: 0 0% 89.8%;
--ring: 0 0% 3.9%;
/* Border Radius */
--radius: 0.5rem;
/* Scrollbar Properties */
--scrollbar-track: 0 0% 85%;
--scrollbar-thumb: 0 0% 65%;
--scrollbar-thumb-hover: 0 0% 55%;
--scrollbar-size: 8px;
/* Workflow Properties */
--workflow-background: 0 0% 100%;
--workflow-dots: 0 0% 94.5%;
--card-background: 0 0% 99.2%;
--card-border: 0 0% 89.8%;
--card-text: 0 0% 3.9%;
--card-hover: 0 0% 96.1%;
/* Base Component Properties */
--base-muted-foreground: #737373;
/* Gradient Colors */
--gradient-primary: 263 85% 70%;
--gradient-secondary: 336 95% 65%;
/* Brand Colors */
--brand-primary-hex: #701ffc;
--brand-primary-hover-hex: #802fff;
--brand-secondary-hex: #6518e6;
--brand-accent-hex: #9d54ff;
--brand-accent-hover-hex: #a66fff;
--brand-background-hex: #0c0c0c;
/* UI Surface Colors */
--surface-elevated: #202020;
}
/* Override dark mode utility classes using CSS variables */
.chat-light-wrapper :is(.dark\:bg-black) {
background-color: hsl(var(--secondary));
}
.chat-light-wrapper :is(.dark\:bg-gray-900) {
background-color: hsl(var(--background));
}
.chat-light-wrapper :is(.dark\:bg-gray-800) {
background-color: hsl(var(--secondary));
}
.chat-light-wrapper :is(.dark\:bg-gray-700) {
background-color: hsl(var(--accent));
}
.chat-light-wrapper :is(.dark\:bg-gray-600) {
background-color: hsl(var(--muted));
}
.chat-light-wrapper :is(.dark\:bg-gray-300) {
background-color: hsl(var(--primary));
}
/* Text color overrides using CSS variables */
.chat-light-wrapper :is(.dark\:text-gray-100) {
color: hsl(var(--primary));
}
.chat-light-wrapper :is(.dark\:text-gray-200) {
color: hsl(var(--foreground));
}
.chat-light-wrapper :is(.dark\:text-gray-300) {
color: hsl(var(--muted-foreground));
}
.chat-light-wrapper :is(.dark\:text-gray-400) {
color: hsl(var(--muted-foreground));
}
.chat-light-wrapper :is(.dark\:text-neutral-600) {
color: hsl(var(--muted-foreground));
}
.chat-light-wrapper :is(.dark\:text-blue-400) {
color: var(--brand-accent-hex);
}
/* Border color overrides using CSS variables */
.chat-light-wrapper :is(.dark\:border-gray-700) {
border-color: hsl(var(--border));
}
.chat-light-wrapper :is(.dark\:border-gray-800) {
border-color: hsl(var(--border));
}
.chat-light-wrapper :is(.dark\:border-gray-600) {
border-color: hsl(var(--border));
}
.chat-light-wrapper :is(.dark\:divide-gray-700) > * + * {
border-color: hsl(var(--border));
}
/* Hover state overrides */
.chat-light-wrapper :is(.dark\:hover\:bg-gray-800\/60:hover) {
background-color: hsl(var(--card-hover));
}
/* Code blocks specific overrides using CSS variables */
.chat-light-wrapper pre:is(.dark\:bg-black) {
background-color: hsl(var(--workflow-dots));
}
.chat-light-wrapper code:is(.dark\:bg-gray-700) {
background-color: hsl(var(--accent));
}
.chat-light-wrapper code:is(.dark\:text-gray-200) {
color: hsl(var(--foreground));
}
/* Force color scheme */
.chat-light-wrapper {
color-scheme: light !important;
}

View File

@@ -481,7 +481,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) {
// Standard text-based chat interface
return (
<div className='fixed inset-0 z-[100] flex flex-col bg-background'>
<div className='fixed inset-0 z-[100] flex flex-col bg-background text-foreground'>
{/* Header component */}
<ChatHeader chatConfig={chatConfig} starCount={starCount} />

View File

@@ -22,53 +22,14 @@ export function ChatHeader({ chatConfig, starCount }: ChatHeaderProps) {
return (
<div className='flex items-center justify-between bg-background/95 px-6 py-4 pt-6 backdrop-blur supports-[backdrop-filter]:bg-background/60 md:px-8 md:pt-4'>
<div className='flex items-center gap-4'>
{customImage ? (
{customImage && (
<img
src={customImage}
alt={`${chatConfig?.title || 'Chat'} logo`}
className='h-12 w-12 rounded-md object-cover'
className='h-8 w-8 rounded-md object-cover'
/>
) : (
// Default Sim Studio logo when no custom image is provided
<div
className='flex h-12 w-12 items-center justify-center rounded-md'
style={{ backgroundColor: primaryColor }}
>
<svg
width='20'
height='20'
viewBox='0 0 50 50'
fill='none'
xmlns='http://www.w3.org/2000/svg'
>
<path
d='M34.1455 20.0728H16.0364C12.7026 20.0728 10 22.7753 10 26.1091V35.1637C10 38.4975 12.7026 41.2 16.0364 41.2H34.1455C37.4792 41.2 40.1818 38.4975 40.1818 35.1637V26.1091C40.1818 22.7753 37.4792 20.0728 34.1455 20.0728Z'
fill={primaryColor}
stroke='white'
strokeWidth='3.5'
strokeLinecap='round'
strokeLinejoin='round'
/>
<path
d='M25.0919 14.0364C26.7588 14.0364 28.1101 12.6851 28.1101 11.0182C28.1101 9.35129 26.7588 8 25.0919 8C23.425 8 22.0737 9.35129 22.0737 11.0182C22.0737 12.6851 23.425 14.0364 25.0919 14.0364Z'
fill={primaryColor}
stroke='white'
strokeWidth='4'
strokeLinecap='round'
strokeLinejoin='round'
/>
<path
d='M25.0915 14.856V19.0277M20.5645 32.1398V29.1216M29.619 29.1216V32.1398'
stroke='white'
strokeWidth='4'
strokeLinecap='round'
strokeLinejoin='round'
/>
<circle cx='25' cy='11' r='2' fill={primaryColor} />
</svg>
</div>
)}
<h2 className='font-medium text-lg'>
<h2 className='font-medium text-foreground text-lg'>
{chatConfig?.customizations?.headerText || chatConfig?.title || 'Chat'}
</h2>
</div>

View File

@@ -2,10 +2,10 @@
export function ChatLoadingState() {
return (
<div className='flex min-h-screen items-center justify-center bg-gray-50'>
<div className='flex min-h-screen items-center justify-center bg-background text-foreground'>
<div className='animate-pulse text-center'>
<div className='mx-auto mb-4 h-8 w-48 rounded bg-gray-200' />
<div className='mx-auto h-4 w-64 rounded bg-gray-200' />
<div className='mx-auto mb-4 h-8 w-48 rounded bg-muted' />
<div className='mx-auto h-4 w-64 rounded bg-muted' />
</div>
</div>
)

View File

@@ -0,0 +1,19 @@
'use client'
import { ThemeProvider } from 'next-themes'
import './chat-client.css'
export default function ChatLayout({ children }: { children: React.ReactNode }) {
return (
<ThemeProvider
attribute='class'
forcedTheme='light'
enableSystem={false}
disableTransitionOnChange
>
<div className='light chat-light-wrapper' style={{ colorScheme: 'light' }}>
{children}
</div>
</ThemeProvider>
)
}

View File

@@ -11,6 +11,7 @@ import {
DropdownMenuTrigger,
} from '@/components/ui/dropdown-menu'
import { Label } from '@/components/ui/label'
import { getEnv, isTruthy } from '@/lib/env'
interface ExampleCommandProps {
command: string
@@ -32,6 +33,7 @@ export function ExampleCommand({
}: ExampleCommandProps) {
const [mode, setMode] = useState<ExampleMode>('sync')
const [exampleType, setExampleType] = useState<ExampleType>('execute')
const isAsyncEnabled = isTruthy(getEnv('NEXT_PUBLIC_TRIGGER_DEV_ENABLED'))
// Format the curl command to use a placeholder for the API key
const formatCurlCommand = (command: string, apiKey: string) => {
@@ -146,62 +148,67 @@ export function ExampleCommand({
<div className='space-y-1.5'>
<div className='flex items-center justify-between'>
{showLabel && <Label className='font-medium text-sm'>Example</Label>}
<div className='flex items-center gap-1'>
<Button
variant='outline'
size='sm'
onClick={() => setMode('sync')}
className={`h-6 min-w-[50px] px-2 py-1 text-xs transition-none ${
mode === 'sync'
? 'border-primary bg-primary text-primary-foreground hover:border-primary hover:bg-primary hover:text-primary-foreground'
: ''
}`}
>
Sync
</Button>
<Button
variant='outline'
size='sm'
onClick={() => setMode('async')}
className={`h-6 min-w-[50px] px-2 py-1 text-xs transition-none ${
mode === 'async'
? 'border-primary bg-primary text-primary-foreground hover:border-primary hover:bg-primary hover:text-primary-foreground'
: ''
}`}
>
Async
</Button>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button
variant='outline'
size='sm'
className='h-6 min-w-[140px] justify-between px-2 py-1 text-xs'
disabled={mode === 'sync'}
>
<span className='truncate'>{getExampleTitle()}</span>
<ChevronDown className='ml-1 h-3 w-3 flex-shrink-0' />
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align='end'>
<DropdownMenuItem
className='cursor-pointer'
onClick={() => setExampleType('execute')}
>
Async Execution
</DropdownMenuItem>
<DropdownMenuItem className='cursor-pointer' onClick={() => setExampleType('status')}>
Check Job Status
</DropdownMenuItem>
<DropdownMenuItem
className='cursor-pointer'
onClick={() => setExampleType('rate-limits')}
>
Rate Limits & Usage
</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
</div>
{isAsyncEnabled && (
<div className='flex items-center gap-1'>
<Button
variant='outline'
size='sm'
onClick={() => setMode('sync')}
className={`h-6 min-w-[50px] px-2 py-1 text-xs transition-none ${
mode === 'sync'
? 'border-primary bg-primary text-primary-foreground hover:border-primary hover:bg-primary hover:text-primary-foreground'
: ''
}`}
>
Sync
</Button>
<Button
variant='outline'
size='sm'
onClick={() => setMode('async')}
className={`h-6 min-w-[50px] px-2 py-1 text-xs transition-none ${
mode === 'async'
? 'border-primary bg-primary text-primary-foreground hover:border-primary hover:bg-primary hover:text-primary-foreground'
: ''
}`}
>
Async
</Button>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button
variant='outline'
size='sm'
className='h-6 min-w-[140px] justify-between px-2 py-1 text-xs'
disabled={mode === 'sync'}
>
<span className='truncate'>{getExampleTitle()}</span>
<ChevronDown className='ml-1 h-3 w-3 flex-shrink-0' />
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align='end'>
<DropdownMenuItem
className='cursor-pointer'
onClick={() => setExampleType('execute')}
>
Async Execution
</DropdownMenuItem>
<DropdownMenuItem
className='cursor-pointer'
onClick={() => setExampleType('status')}
>
Check Job Status
</DropdownMenuItem>
<DropdownMenuItem
className='cursor-pointer'
onClick={() => setExampleType('rate-limits')}
>
Rate Limits & Usage
</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
</div>
)}
</div>
<div className='group relative h-[120px] rounded-md border bg-background transition-colors hover:bg-muted/50'>

View File

@@ -198,35 +198,37 @@ export function useWand({
const { done, value } = await reader.read()
if (done) break
// Process incoming chunks
const text = decoder.decode(value)
const lines = text.split('\n').filter((line) => line.trim() !== '')
// Process incoming chunks using SSE format (identical to Chat panel)
const chunk = decoder.decode(value)
const lines = chunk.split('\n\n')
for (const line of lines) {
try {
const data = JSON.parse(line)
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.substring(6))
// Check if there's an error
if (data.error) {
throw new Error(data.error)
}
// Process chunk
if (data.chunk && !data.done) {
accumulatedContent += data.chunk
// Stream each chunk to the UI immediately
if (onStreamChunk) {
onStreamChunk(data.chunk)
// Check if there's an error
if (data.error) {
throw new Error(data.error)
}
}
// Check if streaming is complete
if (data.done) {
break
// Process chunk
if (data.chunk) {
accumulatedContent += data.chunk
// Stream each chunk to the UI immediately
if (onStreamChunk) {
onStreamChunk(data.chunk)
}
}
// Check if streaming is complete
if (data.done) {
break
}
} catch (parseError) {
// Continue processing other lines
logger.debug('Failed to parse SSE line', { line, parseError })
}
} catch (parseError) {
// Continue processing other lines
logger.debug('Failed to parse streaming line', { line, parseError })
}
}
}

View File

@@ -17,362 +17,363 @@ import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerWebhookExecution')
export type WebhookExecutionPayload = {
webhookId: string
workflowId: string
userId: string
provider: string
body: any
headers: Record<string, string>
path: string
blockId?: string
}
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting webhook execution`, {
webhookId: payload.webhookId,
workflowId: payload.workflowId,
provider: payload.provider,
userId: payload.userId,
executionId,
})
// Initialize logging session outside try block so it's available in catch
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
// Check usage limits first
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.'
)
}
// Load workflow from normalized tables
const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId)
if (!workflowData) {
throw new Error(`Workflow not found: ${payload.workflowId}`)
}
const { blocks, edges, loops, parallels } = workflowData
// Get environment variables (matching workflow-execution pattern)
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, payload.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedPairs = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedPairs)
}
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
})
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
// Process block states for execution
const processedBlockStates = Object.entries(mergedStates).reduce(
(acc, [blockId, blockState]) => {
acc[blockId] = Object.entries(blockState.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Handle workflow variables (for now, use empty object since we don't have workflow metadata)
const workflowVariables = {}
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
// Load the actual webhook record from database to get providerConfig
const [webhookRecord] = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: webhookRecord.providerConfig,
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
// Get the processed Airtable input
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
// If we got input (changes), execute the workflow like other providers
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
// Create executor and execute (same as standard webhook flow)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: airtableInput,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: '',
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
// Execute the workflow
const result = await executor.execute(payload.workflowId, payload.blockId)
// Check if we got a StreamingExecution result
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No Airtable changes to process' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No Airtable changes to process' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
const mockWebhook = {
provider: payload.provider,
blockId: payload.blockId,
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Create executor and execute
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input || {},
workflowVariables,
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 103
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Execute the workflow
const result = await executor.execute(payload.workflowId, payload.blockId)
// Check if we got a StreamingExecution result
const executionResult = 'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Complete logging session with error (matching workflow-execution pattern)
try {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)
}
throw error
}
}
export const webhookExecution = task({
id: 'webhook-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: {
webhookId: string
workflowId: string
userId: string
provider: string
body: any
headers: Record<string, string>
path: string
blockId?: string
}) => {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting webhook execution via trigger.dev`, {
webhookId: payload.webhookId,
workflowId: payload.workflowId,
provider: payload.provider,
userId: payload.userId,
executionId,
})
// Initialize logging session outside try block so it's available in catch
const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId)
try {
// Check usage limits first
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using webhooks.'
)
}
// Load workflow from normalized tables
const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId)
if (!workflowData) {
throw new Error(`Workflow not found: ${payload.workflowId}`)
}
const { blocks, edges, loops, parallels } = workflowData
// Get environment variables (matching workflow-execution pattern)
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, payload.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedPairs = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedPairs)
}
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
})
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks, {})
// Process block states for execution
const processedBlockStates = Object.entries(mergedStates).reduce(
(acc, [blockId, blockState]) => {
acc[blockId] = Object.entries(blockState.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Handle workflow variables (for now, use empty object since we don't have workflow metadata)
const workflowVariables = {}
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(
`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`
)
// Load the actual webhook record from database to get providerConfig
const [webhookRecord] = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
const webhookData = {
id: payload.webhookId,
provider: payload.provider,
providerConfig: webhookRecord.providerConfig,
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
// Get the processed Airtable input
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
// If we got input (changes), execute the workflow like other providers
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
// Create executor and execute (same as standard webhook flow)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: airtableInput,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: '',
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
// Execute the workflow
const result = await executor.execute(payload.workflowId, payload.blockId)
// Check if we got a StreamingExecution result
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Airtable webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No Airtable changes to process' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No Airtable changes to process' },
executedAt: new Date().toISOString(),
}
}
// Format input for standard webhooks
const mockWebhook = {
provider: payload.provider,
blockId: payload.blockId,
}
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
const mockRequest = {
headers: new Map(Object.entries(payload.headers)),
} as any
const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest)
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
finalOutput: { message: 'No messages in WhatsApp payload' },
traceSpans: [],
})
return {
success: true,
workflowId: payload.workflowId,
executionId,
output: { message: 'No messages in WhatsApp payload' },
executedAt: new Date().toISOString(),
}
}
// Create executor and execute
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input || {},
workflowVariables,
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 103
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Execute the workflow
const result = await executor.execute(payload.workflowId, payload.blockId)
// Check if we got a StreamingExecution result
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Webhook execution completed`, {
success: executionResult.success,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
// Track execution in user stats
await db
.update(userStats)
.set({
totalWebhookTriggers: sql`total_webhook_triggers + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
provider: payload.provider,
}
} catch (error: any) {
logger.error(`[${requestId}] Webhook execution failed`, {
error: error.message,
stack: error.stack,
workflowId: payload.workflowId,
provider: payload.provider,
})
// Complete logging session with error (matching workflow-execution pattern)
try {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)
}
throw error // Let Trigger.dev handle retries
}
},
run: async (payload: WebhookExecutionPayload) => executeWebhookJob(payload),
})

View File

@@ -16,200 +16,202 @@ import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerWorkflowExecution')
export type WorkflowExecutionPayload = {
workflowId: string
userId: string
input?: any
triggerType?: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
metadata?: Record<string, any>
}
export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
const workflowId = payload.workflowId
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`, {
userId: payload.userId,
triggerType: payload.triggerType,
executionId,
})
// Initialize logging session
const triggerType = payload.triggerType || 'api'
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using workflows.'
)
}
// Load workflow data from deployed state (this task is only used for API executions right now)
const workflowData = await loadDeployedWorkflowState(workflowId)
const { blocks, edges, loops, parallels } = workflowData
// Merge subblock states (server-safe version doesn't need workflowId)
const mergedStates = mergeSubblockState(blocks, {})
// Process block states for execution
const processedBlockStates = Object.entries(mergedStates).reduce(
(acc, [blockId, blockState]) => {
acc[blockId] = Object.entries(blockState.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Get environment variables
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, payload.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedPairs = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedPairs)
}
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
})
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Create executor and execute
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: payload.input || {},
workflowVariables: {},
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 120
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
const result = await executor.execute(workflowId)
// Handle streaming vs regular result
const executionResult = 'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
executionId,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(workflowId)
// Track execution in user stats
const statsUpdate =
triggerType === 'api'
? { totalApiCalls: sql`total_api_calls + 1` }
: triggerType === 'webhook'
? { totalWebhookTriggers: sql`total_webhook_triggers + 1` }
: triggerType === 'schedule'
? { totalScheduledExecutions: sql`total_scheduled_executions + 1` }
: { totalManualExecutions: sql`total_manual_executions + 1` }
await db
.update(userStats)
.set({
...statsUpdate,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session (for both success and failure)
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
metadata: payload.metadata,
}
} catch (error: any) {
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, {
error: error.message,
stack: error.stack,
})
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Workflow execution failed',
stackTrace: error.stack,
},
})
throw error
}
}
export const workflowExecution = task({
id: 'workflow-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: {
workflowId: string
userId: string
input?: any
triggerType?: string
metadata?: Record<string, any>
}) => {
const workflowId = payload.workflowId
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting Trigger.dev workflow execution: ${workflowId}`, {
userId: payload.userId,
triggerType: payload.triggerType,
executionId,
})
// Initialize logging session
const triggerType =
(payload.triggerType as 'api' | 'webhook' | 'schedule' | 'manual' | 'chat') || 'api'
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
try {
const usageCheck = await checkServerSideUsageLimits(payload.userId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
throw new Error(
usageCheck.message ||
'Usage limit exceeded. Please upgrade your plan to continue using workflows.'
)
}
// Load workflow data from deployed state (this task is only used for API executions right now)
const workflowData = await loadDeployedWorkflowState(workflowId)
const { blocks, edges, loops, parallels } = workflowData
// Merge subblock states (server-safe version doesn't need workflowId)
const mergedStates = mergeSubblockState(blocks, {})
// Process block states for execution
const processedBlockStates = Object.entries(mergedStates).reduce(
(acc, [blockId, blockState]) => {
acc[blockId] = Object.entries(blockState.subBlocks).reduce(
(subAcc, [key, subBlock]) => {
subAcc[key] = subBlock.value
return subAcc
},
{} as Record<string, any>
)
return acc
},
{} as Record<string, Record<string, any>>
)
// Get environment variables
const [userEnv] = await db
.select()
.from(environmentTable)
.where(eq(environmentTable.userId, payload.userId))
.limit(1)
let decryptedEnvVars: Record<string, string> = {}
if (userEnv) {
const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map(
async ([key, encryptedValue]) => {
try {
const { decrypted } = await decryptSecret(encryptedValue as string)
return [key, decrypted] as const
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
)
const decryptedPairs = await Promise.all(decryptionPromises)
decryptedEnvVars = Object.fromEntries(decryptedPairs)
}
// Start logging session
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // TODO: Get from workflow if needed
variables: decryptedEnvVars,
})
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Create executor and execute
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: payload.input || {},
workflowVariables: {},
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 120
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
const result = await executor.execute(workflowId)
// Handle streaming vs regular result
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
executionId,
})
// Update workflow run counts on success
if (executionResult.success) {
await updateWorkflowRunCounts(workflowId)
// Track execution in user stats
const statsUpdate =
triggerType === 'api'
? { totalApiCalls: sql`total_api_calls + 1` }
: triggerType === 'webhook'
? { totalWebhookTriggers: sql`total_webhook_triggers + 1` }
: triggerType === 'schedule'
? { totalScheduledExecutions: sql`total_scheduled_executions + 1` }
: { totalManualExecutions: sql`total_manual_executions + 1` }
await db
.update(userStats)
.set({
...statsUpdate,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, payload.userId))
}
// Build trace spans and complete logging session (for both success and failure)
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: traceSpans as any,
})
return {
success: executionResult.success,
workflowId: payload.workflowId,
executionId,
output: executionResult.output,
executedAt: new Date().toISOString(),
metadata: payload.metadata,
}
} catch (error: any) {
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, {
error: error.message,
stack: error.stack,
})
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Workflow execution failed',
stackTrace: error.stack,
},
})
throw error // Let Trigger.dev handle retries
}
},
run: async (payload: WorkflowExecutionPayload) => executeWorkflowJob(payload),
})

View File

@@ -116,7 +116,7 @@ async function parseDocument(
}> {
const isPDF = mimeType === 'application/pdf'
const hasAzureMistralOCR =
env.AZURE_OPENAI_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME
env.OCR_AZURE_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME
const hasMistralOCR = env.MISTRAL_API_KEY
// Check Azure Mistral OCR configuration
@@ -288,7 +288,7 @@ async function makeOCRRequest(endpoint: string, headers: Record<string, string>,
async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeType: string) {
validateOCRConfig(
env.AZURE_OPENAI_API_KEY,
env.OCR_AZURE_API_KEY,
env.OCR_AZURE_ENDPOINT,
env.OCR_AZURE_MODEL_NAME,
'Azure Mistral OCR'
@@ -306,7 +306,7 @@ async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeT
env.OCR_AZURE_ENDPOINT!,
{
'Content-Type': 'application/json',
Authorization: `Bearer ${env.AZURE_OPENAI_API_KEY}`,
Authorization: `Bearer ${env.OCR_AZURE_API_KEY}`,
},
{
model: env.OCR_AZURE_MODEL_NAME,

View File

@@ -74,6 +74,7 @@ export const env = createEnv({
WAND_OPENAI_MODEL_NAME: z.string().optional(), // Wand generation OpenAI model name (works with both regular OpenAI and Azure OpenAI)
OCR_AZURE_ENDPOINT: z.string().url().optional(), // Azure Mistral OCR service endpoint
OCR_AZURE_MODEL_NAME: z.string().optional(), // Azure Mistral OCR model name for document processing
OCR_AZURE_API_KEY: z.string().min(1).optional(), // Azure Mistral OCR API key
// Monitoring & Analytics
TELEMETRY_ENDPOINT: z.string().url().optional(), // Custom telemetry/analytics endpoint
@@ -96,6 +97,7 @@ export const env = createEnv({
// Background Jobs & Scheduling
TRIGGER_SECRET_KEY: z.string().min(1).optional(), // Trigger.dev secret key for background jobs
TRIGGER_DEV_ENABLED: z.boolean().optional(), // Toggle to enable/disable Trigger.dev for async jobs
CRON_SECRET: z.string().optional(), // Secret for authenticating cron job requests
JOB_RETENTION_DAYS: z.string().optional().default('1'), // Days to retain job logs/data
@@ -216,6 +218,9 @@ export const env = createEnv({
NEXT_PUBLIC_BRAND_ACCENT_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Accent brand color (hex format)
NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Accent brand hover state (hex format)
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Brand background color (hex format)
// Feature Flags
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: z.boolean().optional(), // Client-side gate for async executions UI
},
// Variables available on both server and client
@@ -249,6 +254,7 @@ export const env = createEnv({
NEXT_PUBLIC_BRAND_ACCENT_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_COLOR,
NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR,
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: process.env.NEXT_PUBLIC_BRAND_BACKGROUND_COLOR,
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: process.env.NEXT_PUBLIC_TRIGGER_DEV_ENABLED,
NODE_ENV: process.env.NODE_ENV,
NEXT_TELEMETRY_DISABLED: process.env.NEXT_TELEMETRY_DISABLED,
},

View File

@@ -603,4 +603,133 @@ describe('workflow store', () => {
expect(childBlock.data?.extent).toBe('parent')
})
})
describe('updateBlockName', () => {
beforeEach(() => {
useWorkflowStore.setState({
blocks: {},
edges: [],
loops: {},
parallels: {},
})
const { addBlock } = useWorkflowStore.getState()
addBlock('block1', 'agent', 'Column AD', { x: 0, y: 0 })
addBlock('block2', 'function', 'Employee Length', { x: 100, y: 0 })
addBlock('block3', 'trigger', 'Start', { x: 200, y: 0 })
})
it('should have test blocks set up correctly', () => {
const state = useWorkflowStore.getState()
expect(state.blocks.block1).toBeDefined()
expect(state.blocks.block1.name).toBe('Column AD')
expect(state.blocks.block2).toBeDefined()
expect(state.blocks.block2.name).toBe('Employee Length')
expect(state.blocks.block3).toBeDefined()
expect(state.blocks.block3.name).toBe('Start')
})
it('should successfully rename a block when no conflicts exist', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('block1', 'Data Processor')
expect(result).toBe(true)
const state = useWorkflowStore.getState()
expect(state.blocks.block1.name).toBe('Data Processor')
})
it('should allow renaming a block to a different case/spacing of its current name', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('block1', 'column ad')
expect(result).toBe(true)
const state = useWorkflowStore.getState()
expect(state.blocks.block1.name).toBe('column ad')
})
it('should prevent renaming when another block has the same normalized name', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('block2', 'Column AD')
expect(result).toBe(false)
const state = useWorkflowStore.getState()
expect(state.blocks.block2.name).toBe('Employee Length')
})
it('should prevent renaming when another block has a name that normalizes to the same value', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('block2', 'columnad')
expect(result).toBe(false)
const state = useWorkflowStore.getState()
expect(state.blocks.block2.name).toBe('Employee Length')
})
it('should prevent renaming when another block has a similar name with different spacing', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('block3', 'employee length')
expect(result).toBe(false)
const state = useWorkflowStore.getState()
expect(state.blocks.block3.name).toBe('Start')
})
it('should handle edge cases with empty or whitespace-only names', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result1 = updateBlockName('block1', '')
expect(result1).toBe(true)
const result2 = updateBlockName('block2', ' ')
expect(result2).toBe(true)
const state = useWorkflowStore.getState()
expect(state.blocks.block1.name).toBe('')
expect(state.blocks.block2.name).toBe(' ')
})
it('should return false when trying to rename a non-existent block', () => {
const { updateBlockName } = useWorkflowStore.getState()
const result = updateBlockName('nonexistent', 'New Name')
expect(result).toBe(false)
})
it('should handle complex normalization cases correctly', () => {
const { updateBlockName } = useWorkflowStore.getState()
const conflictingNames = [
'column ad',
'COLUMN AD',
'Column AD',
'columnad',
'ColumnAD',
'COLUMNAD',
]
for (const name of conflictingNames) {
const result = updateBlockName('block2', name)
expect(result).toBe(false)
}
const result = updateBlockName('block2', 'Unique Name')
expect(result).toBe(true)
const state = useWorkflowStore.getState()
expect(state.blocks.block2.name).toBe('Unique Name')
})
})
})

View File

@@ -601,7 +601,33 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
updateBlockName: (id: string, name: string) => {
const oldBlock = get().blocks[id]
if (!oldBlock) return
if (!oldBlock) return false
// Helper function to normalize block names (same as resolver)
const normalizeBlockName = (blockName: string): string => {
return blockName.toLowerCase().replace(/\s+/g, '')
}
// Check for normalized name collisions
const normalizedNewName = normalizeBlockName(name)
const currentBlocks = get().blocks
// Find any other block with the same normalized name
const conflictingBlock = Object.entries(currentBlocks).find(([blockId, block]) => {
return (
blockId !== id && // Different block
block.name && // Has a name
normalizeBlockName(block.name) === normalizedNewName // Same normalized name
)
})
if (conflictingBlock) {
// Don't allow the rename - another block already uses this normalized name
logger.error(
`Cannot rename block to "${name}" - another block "${conflictingBlock[1].name}" already uses the normalized name "${normalizedNewName}"`
)
return false
}
// Create a new state with the updated block name
const newState = {
@@ -696,6 +722,8 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
pushHistory(set, get, newState, `${name} block name updated`)
get().updateLastSaved()
// Note: Socket.IO handles real-time sync automatically
return true
},
toggleBlockWide: (id: string) => {

View File

@@ -183,7 +183,7 @@ export interface WorkflowActions {
toggleBlockEnabled: (id: string) => void
duplicateBlock: (id: string) => void
toggleBlockHandles: (id: string) => void
updateBlockName: (id: string, name: string) => void
updateBlockName: (id: string, name: string) => boolean
toggleBlockWide: (id: string) => void
setBlockWide: (id: string, isWide: boolean) => void
setBlockAdvancedMode: (id: string, advancedMode: boolean) => void

View File

@@ -314,6 +314,42 @@ The following table lists the configurable parameters and their default values.
| `migrations.podSecurityContext` | Migrations pod security context | `fsGroup: 1001` |
| `migrations.securityContext` | Migrations container security context | `runAsNonRoot: true, runAsUser: 1001` |
### CronJob Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `cronjobs.enabled` | Enable all scheduled cron jobs | `true` |
| `cronjobs.image.repository` | CronJob image repository for HTTP requests | `curlimages/curl` |
| `cronjobs.image.tag` | CronJob image tag | `8.5.0` |
| `cronjobs.image.pullPolicy` | CronJob image pull policy | `IfNotPresent` |
| `cronjobs.resources` | CronJob resource limits and requests | See values.yaml |
| `cronjobs.restartPolicy` | CronJob pod restart policy | `OnFailure` |
| `cronjobs.activeDeadlineSeconds` | CronJob active deadline in seconds | `300` |
| `cronjobs.startingDeadlineSeconds` | CronJob starting deadline in seconds | `60` |
| `cronjobs.podSecurityContext` | CronJob pod security context | `fsGroup: 1001` |
| `cronjobs.securityContext` | CronJob container security context | `runAsNonRoot: true, runAsUser: 1001` |
| `cronjobs.jobs.scheduleExecution.enabled` | Enable schedule execution cron job | `true` |
| `cronjobs.jobs.scheduleExecution.name` | Schedule execution job name | `schedule-execution` |
| `cronjobs.jobs.scheduleExecution.schedule` | Schedule execution cron schedule | `"*/1 * * * *"` |
| `cronjobs.jobs.scheduleExecution.path` | Schedule execution API path | `"/api/schedules/execute"` |
| `cronjobs.jobs.scheduleExecution.concurrencyPolicy` | Schedule execution concurrency policy | `Forbid` |
| `cronjobs.jobs.scheduleExecution.successfulJobsHistoryLimit` | Schedule execution successful jobs history | `3` |
| `cronjobs.jobs.scheduleExecution.failedJobsHistoryLimit` | Schedule execution failed jobs history | `1` |
| `cronjobs.jobs.gmailWebhookPoll.enabled` | Enable Gmail webhook polling cron job | `true` |
| `cronjobs.jobs.gmailWebhookPoll.name` | Gmail webhook polling job name | `gmail-webhook-poll` |
| `cronjobs.jobs.gmailWebhookPoll.schedule` | Gmail webhook polling cron schedule | `"*/1 * * * *"` |
| `cronjobs.jobs.gmailWebhookPoll.path` | Gmail webhook polling API path | `"/api/webhooks/poll/gmail"` |
| `cronjobs.jobs.gmailWebhookPoll.concurrencyPolicy` | Gmail webhook polling concurrency policy | `Forbid` |
| `cronjobs.jobs.gmailWebhookPoll.successfulJobsHistoryLimit` | Gmail webhook polling successful jobs history | `3` |
| `cronjobs.jobs.gmailWebhookPoll.failedJobsHistoryLimit` | Gmail webhook polling failed jobs history | `1` |
| `cronjobs.jobs.outlookWebhookPoll.enabled` | Enable Outlook webhook polling cron job | `true` |
| `cronjobs.jobs.outlookWebhookPoll.name` | Outlook webhook polling job name | `outlook-webhook-poll` |
| `cronjobs.jobs.outlookWebhookPoll.schedule` | Outlook webhook polling cron schedule | `"*/1 * * * *"` |
| `cronjobs.jobs.outlookWebhookPoll.path` | Outlook webhook polling API path | `"/api/webhooks/poll/outlook"` |
| `cronjobs.jobs.outlookWebhookPoll.concurrencyPolicy` | Outlook webhook polling concurrency policy | `Forbid` |
| `cronjobs.jobs.outlookWebhookPoll.successfulJobsHistoryLimit` | Outlook webhook polling successful jobs history | `3` |
| `cronjobs.jobs.outlookWebhookPoll.failedJobsHistoryLimit` | Outlook webhook polling failed jobs history | `1` |
### Shared Storage Parameters
| Parameter | Description | Default |
@@ -509,6 +545,46 @@ This creates network policies that:
- Permit DNS resolution and HTTPS egress
- Support custom ingress/egress rules
### CronJobs for Scheduled Tasks
Enable automated scheduled tasks functionality:
```yaml
cronjobs:
enabled: true
# Customize individual jobs
jobs:
scheduleExecution:
enabled: true
schedule: "*/1 * * * *" # Every minute
gmailWebhookPoll:
enabled: true
schedule: "*/1 * * * *" # Every minute
outlookWebhookPoll:
enabled: true
schedule: "*/1 * * * *" # Every minute
# Global job configuration
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
```
This creates Kubernetes CronJob resources that:
- Execute HTTP requests to your application's API endpoints
- Handle retries and error logging automatically
- Use minimal resources with curl-based containers
- Support individual enable/disable per job
- Follow Kubernetes security best practices
### High Availability
Configure pod disruption budgets and anti-affinity:

View File

@@ -0,0 +1,90 @@
{{- if .Values.cronjobs.enabled }}
{{- range $jobKey, $jobConfig := .Values.cronjobs.jobs }}
{{- if $jobConfig.enabled }}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ include "sim.fullname" $ }}-{{ $jobConfig.name }}
labels:
{{- include "sim.labels" $ | nindent 4 }}
app.kubernetes.io/component: cronjob-{{ $jobConfig.name }}
spec:
schedule: {{ $jobConfig.schedule | quote }}
concurrencyPolicy: {{ $jobConfig.concurrencyPolicy | default "Forbid" }}
successfulJobsHistoryLimit: {{ $jobConfig.successfulJobsHistoryLimit | default 3 }}
failedJobsHistoryLimit: {{ $jobConfig.failedJobsHistoryLimit | default 1 }}
{{- with $.Values.cronjobs.startingDeadlineSeconds }}
startingDeadlineSeconds: {{ . }}
{{- end }}
jobTemplate:
spec:
{{- with $.Values.cronjobs.activeDeadlineSeconds }}
activeDeadlineSeconds: {{ . }}
{{- end }}
template:
metadata:
labels:
{{- include "sim.selectorLabels" $ | nindent 12 }}
app.kubernetes.io/component: cronjob-{{ $jobConfig.name }}
spec:
restartPolicy: {{ $.Values.cronjobs.restartPolicy | default "OnFailure" }}
{{- with $.Values.cronjobs.podSecurityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
containers:
- name: {{ $jobConfig.name }}
image: "{{ $.Values.cronjobs.image.repository }}:{{ $.Values.cronjobs.image.tag }}"
imagePullPolicy: {{ $.Values.cronjobs.image.pullPolicy }}
{{- with $.Values.cronjobs.securityContext }}
securityContext:
{{- toYaml . | nindent 14 }}
{{- end }}
command:
- /bin/sh
- -c
args:
- |
echo "Starting cron job: {{ $jobConfig.name }}"
echo "Making HTTP request to {{ $jobConfig.path }}"
# Determine the service URL (use internal service regardless of ingress)
SERVICE_URL="http://{{ include "sim.fullname" $ }}-app:{{ $.Values.app.service.port }}"
# Make the HTTP request with timeout and retry logic
for i in $(seq 1 3); do
echo "Attempt $i/3"
if curl -f -s -S --max-time 60 --retry 2 --retry-delay 5 \
-H "Content-Type: application/json" \
-H "User-Agent: Kubernetes-CronJob/{{ $jobConfig.name }}" \
"$SERVICE_URL{{ $jobConfig.path }}"; then
echo "Success: HTTP request completed"
exit 0
fi
echo "Attempt $i failed, retrying..."
sleep 10
done
echo "Error: All attempts failed"
exit 1
resources:
{{- toYaml $.Values.cronjobs.resources | nindent 14 }}
{{- with $.Values.global.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with $.Values.app.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with $.Values.affinity }}
affinity:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with $.Values.tolerations }}
tolerations:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -582,6 +582,68 @@ affinity: {}
# Tolerations for scheduling on tainted nodes
tolerations: []
# CronJob configuration for scheduled tasks
cronjobs:
# Enable/disable all cron jobs
enabled: true
# Individual job configurations
jobs:
scheduleExecution:
enabled: true
name: schedule-execution
schedule: "*/1 * * * *"
path: "/api/schedules/execute"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
gmailWebhookPoll:
enabled: true
name: gmail-webhook-poll
schedule: "*/1 * * * *"
path: "/api/webhooks/poll/gmail"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
outlookWebhookPoll:
enabled: true
name: outlook-webhook-poll
schedule: "*/1 * * * *"
path: "/api/webhooks/poll/outlook"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
# Global CronJob settings
image:
repository: curlimages/curl
tag: 8.5.0
pullPolicy: IfNotPresent
resources:
limits:
memory: "128Mi"
cpu: "100m"
requests:
memory: "64Mi"
cpu: "50m"
restartPolicy: OnFailure
activeDeadlineSeconds: 300
startingDeadlineSeconds: 60
# Pod security context
podSecurityContext:
fsGroup: 1001
# Container security context
securityContext:
runAsNonRoot: true
runAsUser: 1001
# Observability and telemetry configuration
telemetry:
# Enable/disable telemetry collection