Compare commits

...

8 Commits

Author SHA1 Message Date
Vikhyath Mondreti
7f82ed381a v0.4.13: bugfixes for dev containers, posthog redirect, helm updates 2025-10-13 14:21:18 -07:00
Vikhyath Mondreti
3dd36a8a35 fix(schedules): migrate to trigger dev (#1618)
* fix(schedules): add cron job auth like other cron routes

* migrate schedules to trigger dev

* remove executions check

* fix tests
2025-10-13 13:12:55 -07:00
Waleed
09cccd5487 feat(helm): added cert-postgresql template to helm (#1620)
* feat(helm): added cert-postgresql template to helm

* use js-tiktoken (pure js) in favor of tiktoken (wasm)
2025-10-13 12:41:43 -07:00
Waleed
1773530325 fix(sdk): fixed ts sdk publishing job to use later node version (#1619) 2025-10-13 12:13:50 -07:00
Waleed
2da7a6755c feat(helm): added pdb to helm (#1617)
* feat(helm): added pdb to helm

* add additional config
2025-10-13 12:06:30 -07:00
Waleed
1e81cd6850 fix(kb): added tiktoken for embedding token estimation (#1616)
* fix(kb): added tiktoken for embedding token estimation

* added missing mock
2025-10-13 11:53:50 -07:00
Vikhyath Mondreti
ec73e2e9ce fix(posthog): prevent redirects by whitelisting ingest route (#1615) 2025-10-13 11:08:48 -07:00
Waleed
4937d72d70 fix(devcontainer): fixed devcontainers (#1614) 2025-10-12 18:01:37 -07:00
31 changed files with 1468 additions and 1135 deletions

View File

@@ -1,69 +0,0 @@
# Sim Development Environment Bashrc
# This gets sourced by post-create.sh
# Enhanced prompt with git branch info
parse_git_branch() {
git branch 2> /dev/null | sed -e '/^[^*]/d' -e 's/* \(.*\)/ (\1)/'
}
export PS1="\[\033[01;32m\]\u@simstudio\[\033[00m\]:\[\033[01;34m\]\w\[\033[33m\]\$(parse_git_branch)\[\033[00m\]\$ "
# Helpful aliases
alias ll="ls -la"
alias ..="cd .."
alias ...="cd ../.."
# Database aliases
alias pgc="PGPASSWORD=postgres psql -h db -U postgres -d simstudio"
alias check-db="PGPASSWORD=postgres psql -h db -U postgres -c '\l'"
# Sim specific aliases
alias logs="cd /workspace/apps/sim && tail -f logs/*.log 2>/dev/null || echo 'No log files found'"
alias sim-start="cd /workspace && bun run dev"
alias sim-migrate="cd /workspace/apps/sim && bunx drizzle-kit push"
alias sim-generate="cd /workspace/apps/sim && bunx drizzle-kit generate"
alias sim-rebuild="cd /workspace && bun run build && bun run start"
alias docs-dev="cd /workspace/apps/docs && bun run dev"
# Turbo related commands
alias turbo-build="cd /workspace && bunx turbo run build"
alias turbo-dev="cd /workspace && bunx turbo run dev"
alias turbo-test="cd /workspace && bunx turbo run test"
# Bun specific commands
alias bun-update="cd /workspace && bun update"
alias bun-add="cd /workspace && bun add"
alias bun-pm="cd /workspace && bun pm"
alias bun-canary="bun upgrade --canary"
# Default to workspace directory
cd /workspace 2>/dev/null || true
# Welcome message - only show once per session
if [ -z "$SIM_WELCOME_SHOWN" ]; then
export SIM_WELCOME_SHOWN=1
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "🚀 Welcome to Sim development environment!"
echo ""
echo "Available commands:"
echo " sim-start - Start all apps in development mode"
echo " sim-migrate - Push schema changes to the database for sim app"
echo " sim-generate - Generate new migrations for sim app"
echo " sim-rebuild - Build and start all apps"
echo " docs-dev - Start only the docs app in development mode"
echo ""
echo "Turbo commands:"
echo " turbo-build - Build all apps using Turborepo"
echo " turbo-dev - Start development mode for all apps"
echo " turbo-test - Run tests for all packages"
echo ""
echo "Bun commands:"
echo " bun-update - Update dependencies"
echo " bun-add - Add a new dependency"
echo " bun-pm - Manage dependencies"
echo " bun-canary - Upgrade to the latest canary version of Bun"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
fi

View File

@@ -1,38 +1,43 @@
# Use the latest Bun canary image for development
FROM oven/bun:canary
# Avoid warnings by switching to noninteractive
ENV DEBIAN_FRONTEND=noninteractive
FROM oven/bun:1.2.22-alpine
# Install necessary packages for development
RUN apt-get update \
&& apt-get -y install --no-install-recommends \
git curl wget jq sudo postgresql-client vim nano \
bash-completion ca-certificates lsb-release gnupg \
&& apt-get clean -y \
&& rm -rf /var/lib/apt/lists/*
RUN apk add --no-cache \
git \
curl \
wget \
jq \
sudo \
postgresql-client \
vim \
nano \
bash \
bash-completion \
zsh \
zsh-vcs \
ca-certificates \
shadow
# Create a non-root user
# Create a non-root user with matching UID/GID
ARG USERNAME=bun
ARG USER_UID=1000
ARG USER_GID=$USER_UID
# Create user group if it doesn't exist
RUN if ! getent group $USER_GID >/dev/null; then \
addgroup -g $USER_GID $USERNAME; \
fi
# Create user if it doesn't exist
RUN if ! getent passwd $USER_UID >/dev/null; then \
adduser -D -u $USER_UID -G $(getent group $USER_GID | cut -d: -f1) $USERNAME; \
fi
# Add sudo support
RUN echo "$USERNAME ALL=(ALL) NOPASSWD: ALL" > /etc/sudoers.d/$USERNAME \
&& chmod 0440 /etc/sudoers.d/$USERNAME
# Install global packages for development
RUN bun install -g turbo drizzle-kit typescript @types/node
# Install bun completions
RUN bun completions > /etc/bash_completion.d/bun
# Set up shell environment
RUN echo "export PATH=$PATH:/home/$USERNAME/.bun/bin" >> /etc/profile
RUN echo "source /etc/profile" >> /etc/bash.bashrc
# Switch back to dialog for any ad-hoc use of apt-get
ENV DEBIAN_FRONTEND=dialog
RUN echo "export PATH=\$PATH:/home/$USERNAME/.bun/bin" >> /etc/profile
WORKDIR /workspace

View File

@@ -1,78 +1,75 @@
# Sim Development Container
This directory contains configuration files for Visual Studio Code Dev Containers / GitHub Codespaces. Dev containers provide a consistent, isolated development environment for this project.
Development container configuration for VS Code Dev Containers and GitHub Codespaces.
## Contents
- `devcontainer.json` - The main configuration file that defines the development container settings
- `Dockerfile` - Defines the container image and development environment
- `docker-compose.yml` - Sets up the application and database containers
- `post-create.sh` - Script that runs when the container is created
- `.bashrc` - Custom shell configuration with helpful aliases
## Usage
### Prerequisites
## Prerequisites
- Visual Studio Code
- Docker installation:
- Docker Desktop (Windows/macOS)
- Docker Engine (Linux)
- VS Code Remote - Containers extension
- Docker Desktop or Podman Desktop
- VS Code Dev Containers extension
### Getting Started
## Getting Started
1. Open this project in Visual Studio Code
2. When prompted, click "Reopen in Container"
- Alternatively, press `F1` and select "Remote-Containers: Reopen in Container"
1. Open this project in VS Code
2. Click "Reopen in Container" when prompted (or press `F1` → "Dev Containers: Reopen in Container")
3. Wait for the container to build and initialize
4. The post-creation script will automatically:
4. Start developing with `sim-start`
- Install dependencies
- Set up environment variables
- Run database migrations
- Configure helpful aliases
The setup script will automatically install dependencies and run migrations.
5. Start the application with `sim-start` (alias for `bun run dev`)
## Development Commands
### Development Commands
### Running Services
The development environment includes these helpful aliases:
You have two options for running the development environment:
**Option 1: Run everything together (recommended for most development)**
```bash
sim-start # Runs both app and socket server using concurrently
```
**Option 2: Run services separately (useful for debugging individual services)**
- In the **app** container terminal: `sim-app` (starts Next.js app on port 3000)
- In the **realtime** container terminal: `sim-sockets` (starts socket server on port 3002)
### Other Commands
- `sim-start` - Start the development server
- `sim-migrate` - Push schema changes to the database
- `sim-generate` - Generate new migrations
- `sim-rebuild` - Build and start the production version
- `pgc` - Connect to the PostgreSQL database
- `check-db` - List all databases
### Using GitHub Codespaces
This project is also configured for GitHub Codespaces. To use it:
1. Go to the GitHub repository
2. Click the "Code" button
3. Select the "Codespaces" tab
4. Click "Create codespace on main"
This will start a new Codespace with the development environment already set up.
## Customization
You can customize the development environment by:
- Modifying `devcontainer.json` to add VS Code extensions or settings
- Updating the `Dockerfile` to install additional packages
- Editing `docker-compose.yml` to add services or change configuration
- Modifying `.bashrc` to add custom aliases or configurations
- `build` - Build the application
- `pgc` - Connect to PostgreSQL database
## Troubleshooting
If you encounter issues:
**Build errors**: Rebuild the container with `F1` → "Dev Containers: Rebuild Container"
1. Rebuild the container: `F1` → "Remote-Containers: Rebuild Container"
2. Check Docker logs for build errors
3. Verify Docker Desktop is running
4. Ensure all prerequisites are installed
**Port conflicts**: Ensure ports 3000, 3002, and 5432 are available
For more information, see the [VS Code Remote Development documentation](https://code.visualstudio.com/docs/remote/containers).
**Container runtime issues**: Verify Docker Desktop or Podman Desktop is running
## Technical Details
Services:
- **App container** (8GB memory limit) - Main Next.js application
- **Realtime container** (4GB memory limit) - Socket.io server for real-time features
- **Database** - PostgreSQL with pgvector extension
- **Migrations** - Runs automatically on container creation
You can develop with services running together or independently.
### Personalization
**Project commands** (`sim-start`, `sim-app`, etc.) are automatically available via `/workspace/.devcontainer/sim-commands.sh`.
**Personal shell customization** (aliases, prompts, etc.) should use VS Code's dotfiles feature:
1. Create a dotfiles repository (e.g., `github.com/youruser/dotfiles`)
2. Add your `.bashrc`, `.zshrc`, or other configs
3. Configure in VS Code Settings:
```json
{
"dotfiles.repository": "youruser/dotfiles",
"dotfiles.installCommand": "install.sh"
}
```
This separates project-specific commands from personal preferences, following VS Code best practices.

View File

@@ -13,13 +13,6 @@
"source.fixAll.biome": "explicit",
"source.organizeImports.biome": "explicit"
},
"terminal.integrated.defaultProfile.linux": "bash",
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash",
"args": ["--login"]
}
},
"terminal.integrated.shellIntegration.enabled": true
},
"extensions": [
@@ -36,18 +29,9 @@
}
},
"forwardPorts": [3000, 5432],
"forwardPorts": [3000, 3002, 5432],
"postCreateCommand": "bash -c 'bash .devcontainer/post-create.sh || true'",
"postStartCommand": "bash -c 'if [ ! -f ~/.bashrc ] || ! grep -q \"sim-start\" ~/.bashrc; then cp .devcontainer/.bashrc ~/.bashrc; fi'",
"remoteUser": "bun",
"features": {
"ghcr.io/devcontainers/features/git:1": {},
"ghcr.io/prulloac/devcontainer-features/bun:1": {
"version": "latest"
}
}
"remoteUser": "bun"
}

View File

@@ -7,52 +7,56 @@ services:
- ..:/workspace:cached
- bun-cache:/home/bun/.bun/cache:delegated
command: sleep infinity
deploy:
resources:
limits:
memory: 8G
environment:
- NODE_ENV=development
- DATABASE_URL=postgresql://postgres:postgres@db:5432/simstudio
- BETTER_AUTH_URL=http://localhost:3000
- NEXT_PUBLIC_APP_URL=http://localhost:3000
- BETTER_AUTH_SECRET=${BETTER_AUTH_SECRET:-your_auth_secret_here}
- ENCRYPTION_KEY=${ENCRYPTION_KEY:-your_encryption_key_here}
- COPILOT_API_KEY=${COPILOT_API_KEY}
- SIM_AGENT_API_URL=${SIM_AGENT_API_URL}
- OLLAMA_URL=${OLLAMA_URL:-http://localhost:11434}
- NEXT_PUBLIC_SOCKET_URL=${NEXT_PUBLIC_SOCKET_URL:-http://localhost:3002}
- BUN_INSTALL_CACHE_DIR=/home/bun/.bun/cache
depends_on:
db:
condition: service_healthy
realtime:
condition: service_healthy
migrations:
condition: service_completed_successfully
ports:
- "3000:3000"
- "3001:3001"
working_dir: /workspace
healthcheck:
test: ['CMD', 'wget', '--spider', '--quiet', 'http://127.0.0.1:3000']
interval: 90s
timeout: 5s
retries: 3
start_period: 10s
realtime:
build:
context: ..
dockerfile: .devcontainer/Dockerfile
volumes:
- ..:/workspace:cached
- bun-cache:/home/bun/.bun/cache:delegated
command: sleep infinity
deploy:
resources:
limits:
memory: 4G
environment:
- NODE_ENV=development
- DATABASE_URL=postgresql://postgres:postgres@db:5432/simstudio
- BETTER_AUTH_URL=http://localhost:3000
- NEXT_PUBLIC_APP_URL=http://localhost:3000
- BETTER_AUTH_SECRET=${BETTER_AUTH_SECRET:-your_auth_secret_here}
depends_on:
db:
condition: service_healthy
ports:
- "3002:3002"
working_dir: /workspace
healthcheck:
test: ['CMD', 'wget', '--spider', '--quiet', 'http://127.0.0.1:3002']
interval: 90s
timeout: 5s
retries: 3
start_period: 10s
migrations:
build:

View File

@@ -8,11 +8,43 @@ echo "🔧 Setting up Sim development environment..."
# Change to the workspace root directory
cd /workspace
# Setup .bashrc
echo "📄 Setting up .bashrc with aliases..."
cp /workspace/.devcontainer/.bashrc ~/.bashrc
# Add to .profile to ensure .bashrc is sourced in non-interactive shells
echo 'if [ -f ~/.bashrc ]; then . ~/.bashrc; fi' >> ~/.profile
# Install global packages for development (done at runtime, not build time)
echo "📦 Installing global development tools..."
bun install -g turbo drizzle-kit typescript @types/node 2>/dev/null || {
echo "⚠️ Some global packages may already be installed, continuing..."
}
# Set up bun completions (with proper shell detection)
echo "🔧 Setting up shell completions..."
if [ -n "$SHELL" ] && [ -f "$SHELL" ]; then
SHELL=/bin/bash bun completions 2>/dev/null | sudo tee /etc/bash_completion.d/bun > /dev/null || {
echo "⚠️ Could not install bun completions, but continuing..."
}
fi
# Add project commands to shell profile
echo "📄 Setting up project commands..."
# Add sourcing of sim-commands.sh to user's shell config files if they exist
for rcfile in ~/.bashrc ~/.zshrc; do
if [ -f "$rcfile" ]; then
# Check if already added
if ! grep -q "sim-commands.sh" "$rcfile"; then
echo "" >> "$rcfile"
echo "# Sim project commands" >> "$rcfile"
echo "if [ -f /workspace/.devcontainer/sim-commands.sh ]; then" >> "$rcfile"
echo " source /workspace/.devcontainer/sim-commands.sh" >> "$rcfile"
echo "fi" >> "$rcfile"
fi
fi
done
# If no rc files exist yet, create a minimal one
if [ ! -f ~/.bashrc ] && [ ! -f ~/.zshrc ]; then
echo "# Source Sim project commands" > ~/.bashrc
echo "if [ -f /workspace/.devcontainer/sim-commands.sh ]; then" >> ~/.bashrc
echo " source /workspace/.devcontainer/sim-commands.sh" >> ~/.bashrc
echo "fi" >> ~/.bashrc
fi
# Clean and reinstall dependencies to ensure platform compatibility
echo "📦 Cleaning and reinstalling dependencies..."
@@ -29,18 +61,12 @@ chmod 700 ~/.bun ~/.bun/cache
# Install dependencies with platform-specific binaries
echo "Installing dependencies with Bun..."
bun install || {
echo "⚠️ bun install had issues but continuing setup..."
}
bun install
# Check for native dependencies
echo "Checking for native dependencies compatibility..."
NATIVE_DEPS=$(grep '"trustedDependencies"' apps/sim/package.json || echo "")
if [ ! -z "$NATIVE_DEPS" ]; then
echo "⚠️ Native dependencies detected. Ensuring compatibility with Bun..."
for pkg in $(echo $NATIVE_DEPS | grep -oP '"[^"]*"' | tr -d '"' | grep -v "trustedDependencies"); do
echo "Checking compatibility for $pkg..."
done
if grep -q '"trustedDependencies"' apps/sim/package.json 2>/dev/null; then
echo "⚠️ Native dependencies detected. Bun will handle compatibility during install."
fi
# Set up environment variables if .env doesn't exist for the sim app
@@ -82,23 +108,6 @@ echo "Waiting for database to be ready..."
fi
) || echo "⚠️ Database setup had issues but continuing..."
# Add additional helpful aliases to .bashrc
cat << EOF >> ~/.bashrc
# Additional Sim Development Aliases
alias migrate="cd /workspace/apps/sim && DATABASE_URL=postgresql://postgres:postgres@db:5432/simstudio bunx drizzle-kit push"
alias generate="cd /workspace/apps/sim && bunx drizzle-kit generate"
alias dev="cd /workspace && bun run dev"
alias build="cd /workspace && bun run build"
alias start="cd /workspace && bun run dev"
alias lint="cd /workspace/apps/sim && bun run lint"
alias test="cd /workspace && bun run test"
alias bun-update="cd /workspace && bun update"
EOF
# Source the .bashrc to make aliases available immediately
. ~/.bashrc
# Clear the welcome message flag to ensure it shows after setup
unset SIM_WELCOME_SHOWN

42
.devcontainer/sim-commands.sh Executable file
View File

@@ -0,0 +1,42 @@
#!/bin/bash
# Sim Project Commands
# Source this file to add project-specific commands to your shell
# Add to your ~/.bashrc or ~/.zshrc: source /workspace/.devcontainer/sim-commands.sh
# Project-specific aliases for Sim development
alias sim-start="cd /workspace && bun run dev:full"
alias sim-app="cd /workspace && bun run dev"
alias sim-sockets="cd /workspace && bun run dev:sockets"
alias sim-migrate="cd /workspace/apps/sim && bunx drizzle-kit push"
alias sim-generate="cd /workspace/apps/sim && bunx drizzle-kit generate"
alias sim-rebuild="cd /workspace && bun run build && bun run start"
alias docs-dev="cd /workspace/apps/docs && bun run dev"
# Database connection helpers
alias pgc="PGPASSWORD=postgres psql -h db -U postgres -d simstudio"
alias check-db="PGPASSWORD=postgres psql -h db -U postgres -c '\l'"
# Default to workspace directory
cd /workspace 2>/dev/null || true
# Welcome message - show once per session
if [ -z "$SIM_WELCOME_SHOWN" ]; then
export SIM_WELCOME_SHOWN=1
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "🚀 Sim Development Environment"
echo ""
echo "Project commands:"
echo " sim-start - Start app + socket server"
echo " sim-app - Start only main app"
echo " sim-sockets - Start only socket server"
echo " sim-migrate - Push schema changes"
echo " sim-generate - Generate migrations"
echo ""
echo "Database:"
echo " pgc - Connect to PostgreSQL"
echo " check-db - List databases"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
fi

View File

@@ -21,7 +21,7 @@ jobs:
- name: Setup Node.js for npm publishing
uses: actions/setup-node@v4
with:
node-version: '18'
node-version: '22'
registry-url: 'https://registry.npmjs.org/'
- name: Install dependencies

View File

@@ -9,23 +9,22 @@ import {
InvalidRequestError,
} from '@/app/api/files/utils'
// Allowlist of permitted file extensions for security
const ALLOWED_EXTENSIONS = new Set([
// Documents
'pdf',
'doc',
'docx',
'txt',
'md',
// Images (safe formats)
'png',
'jpg',
'jpeg',
'gif',
// Data files
'csv',
'xlsx',
'xls',
'json',
'yaml',
'yml',
])
/**
@@ -50,19 +49,16 @@ export async function POST(request: NextRequest) {
const formData = await request.formData()
// Check if multiple files are being uploaded or a single file
const files = formData.getAll('file') as File[]
if (!files || files.length === 0) {
throw new InvalidRequestError('No files provided')
}
// Get optional scoping parameters for execution-scoped storage
const workflowId = formData.get('workflowId') as string | null
const executionId = formData.get('executionId') as string | null
const workspaceId = formData.get('workspaceId') as string | null
// Log storage mode
const usingCloudStorage = isUsingCloudStorage()
logger.info(`Using storage mode: ${usingCloudStorage ? 'Cloud' : 'Local'} for file upload`)
@@ -74,7 +70,6 @@ export async function POST(request: NextRequest) {
const uploadResults = []
// Process each file
for (const file of files) {
const originalName = file.name
@@ -88,9 +83,7 @@ export async function POST(request: NextRequest) {
const bytes = await file.arrayBuffer()
const buffer = Buffer.from(bytes)
// For execution-scoped files, use the dedicated execution file storage
if (workflowId && executionId) {
// Use the dedicated execution file storage system
const { uploadExecutionFile } = await import('@/lib/workflows/execution-file-storage')
const userFile = await uploadExecutionFile(
{
@@ -107,13 +100,10 @@ export async function POST(request: NextRequest) {
continue
}
// Upload to cloud or local storage using the standard uploadFile function
try {
logger.info(`Uploading file: ${originalName}`)
const result = await uploadFile(buffer, originalName, file.type, file.size)
// Generate a presigned URL for cloud storage with appropriate expiry
// Regular files get 24 hours (execution files are handled above)
let presignedUrl: string | undefined
if (usingCloudStorage) {
try {
@@ -144,7 +134,6 @@ export async function POST(request: NextRequest) {
}
}
// Return all file information
if (uploadResults.length === 1) {
return NextResponse.json(uploadResults[0])
}
@@ -155,7 +144,6 @@ export async function POST(request: NextRequest) {
}
}
// Handle preflight requests
export async function OPTIONS() {
return createOptionsResponse()
}

View File

@@ -32,6 +32,7 @@ vi.stubGlobal(
vi.mock('@/lib/env', () => ({
env: {},
getEnv: (key: string) => process.env[key],
isTruthy: (value: string | boolean | number | undefined) =>
typeof value === 'string' ? value === 'true' || value === '1' : Boolean(value),
}))

View File

@@ -17,6 +17,7 @@ vi.mock('drizzle-orm', () => ({
vi.mock('@/lib/env', () => ({
env: { OPENAI_API_KEY: 'test-key' },
getEnv: (key: string) => process.env[key],
isTruthy: (value: string | boolean | number | undefined) =>
typeof value === 'string' ? value === 'true' || value === '1' : Boolean(value),
}))

View File

@@ -3,81 +3,50 @@
*
* @vitest-environment node
*/
import type { NextRequest } from 'next/server'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
mockExecutionDependencies,
mockScheduleExecuteDb,
sampleWorkflowState,
} from '@/app/api/__test-utils__/utils'
function createMockRequest(): NextRequest {
const mockHeaders = new Map([
['authorization', 'Bearer test-cron-secret'],
['content-type', 'application/json'],
])
return {
headers: {
get: (key: string) => mockHeaders.get(key.toLowerCase()) || null,
},
url: 'http://localhost:3000/api/schedules/execute',
} as NextRequest
}
describe('Scheduled Workflow Execution API Route', () => {
beforeEach(() => {
vi.clearAllMocks()
vi.resetModules()
})
mockExecutionDependencies()
afterEach(() => {
vi.clearAllMocks()
vi.resetModules()
})
// Mock all dependencies
vi.doMock('@/services/queue', () => ({
RateLimiter: vi.fn().mockImplementation(() => ({
checkRateLimitWithSubscription: vi.fn().mockResolvedValue({
allowed: true,
remaining: 100,
resetAt: new Date(Date.now() + 60000),
}),
})),
it('should execute scheduled workflows with Trigger.dev disabled', async () => {
const mockExecuteScheduleJob = vi.fn().mockResolvedValue(undefined)
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@/lib/billing', () => ({
checkServerSideUsageLimits: vi.fn().mockResolvedValue({ isExceeded: false }),
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: mockExecuteScheduleJob,
}))
vi.doMock('@/lib/billing/core/subscription', () => ({
getHighestPrioritySubscription: vi.fn().mockResolvedValue({
plan: 'pro',
status: 'active',
}),
}))
vi.doMock('@/lib/environment/utils', () => ({
getPersonalAndWorkspaceEnv: vi.fn().mockResolvedValue({
personalEncrypted: {},
workspaceEncrypted: {},
}),
}))
vi.doMock('@/lib/logs/execution/logging-session', () => ({
LoggingSession: vi.fn().mockImplementation(() => ({
safeStart: vi.fn().mockResolvedValue(undefined),
safeComplete: vi.fn().mockResolvedValue(undefined),
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
setupExecutor: vi.fn(),
})),
}))
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: sampleWorkflowState.parallels || {},
}),
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
vi.doMock('@/stores/workflows/server-utils', () => ({
mergeSubblockState: vi.fn().mockReturnValue(sampleWorkflowState.blocks),
}))
vi.doMock('@/lib/schedules/utils', () => ({
calculateNextRunTime: vi.fn().mockReturnValue(new Date(Date.now() + 60000)),
getScheduleTimeValues: vi.fn().mockReturnValue({}),
getSubBlockValue: vi.fn().mockReturnValue('manual'),
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
@@ -85,198 +54,209 @@ describe('Scheduled Workflow Execution API Route', () => {
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
sql: vi.fn((strings, ...values) => ({ strings, values, type: 'sql' })),
}))
vi.doMock('croner', () => ({
Cron: vi.fn().mockImplementation(() => ({
nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute
})),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation((_table: any) => ({
where: vi.fn().mockImplementation((_cond: any) => ({
limit: vi.fn().mockImplementation((n?: number) => {
// Always return empty array - no due schedules
return []
}),
})),
})),
})),
update: vi.fn().mockImplementation(() => ({
set: vi.fn().mockImplementation(() => ({
where: vi.fn().mockResolvedValue([]),
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return {
db: mockDb,
userStats: {
userId: 'userId',
totalScheduledExecutions: 'totalScheduledExecutions',
lastActive: 'lastActive',
},
workflow: { id: 'id', userId: 'userId', state: 'state' },
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
nextRunAt: 'nextRunAt',
status: 'status',
},
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET(createMockRequest())
expect(response).toBeDefined()
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount', 1)
})
afterEach(() => {
vi.clearAllMocks()
})
it('should queue schedules to Trigger.dev when enabled', async () => {
const mockTrigger = vi.fn().mockResolvedValue({ id: 'task-id-123' })
it('should execute scheduled workflows successfully', async () => {
const executeMock = vi.fn().mockResolvedValue({
success: true,
output: { response: 'Scheduled execution completed' },
logs: [],
metadata: {
duration: 100,
startTime: new Date().toISOString(),
endTime: new Date().toISOString(),
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
trigger: mockTrigger,
},
})
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
expect(response).toBeDefined()
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount')
})
it('should handle errors during scheduled execution gracefully', async () => {
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: vi.fn().mockRejectedValue(new Error('Execution failed')),
})),
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: true,
},
isTruthy: vi.fn(() => true),
}))
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
expect(response).toBeDefined()
const data = await response.json()
expect(data).toHaveProperty('message')
})
it('should handle case with no due schedules', async () => {
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => []),
})),
})),
})),
update: vi.fn().mockImplementation(() => ({
set: vi.fn().mockImplementation(() => ({
where: vi.fn().mockResolvedValue([]),
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return { db: mockDb }
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
const response = await GET(createMockRequest())
expect(response).toBeDefined()
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('executedCount', 0)
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
expect(executeMock).not.toHaveBeenCalled()
expect(data).toHaveProperty('executedCount', 1)
})
// Removed: Test isolation issues with mocks make this unreliable
it('should execute schedules that are explicitly marked as active', async () => {
const executeMock = vi.fn().mockResolvedValue({ success: true, metadata: {} })
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
it('should handle case with no due schedules', async () => {
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-active',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'active',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: vi.fn().mockResolvedValue(undefined),
}))
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => []),
})),
})),
}
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
expect(response.status).toBe(200)
})
it('should not execute schedules that are disabled', async () => {
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-disabled',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'disabled',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET()
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount', 0)
})
expect(executeMock).not.toHaveBeenCalled()
it('should execute multiple schedules in parallel', async () => {
vi.doMock('@/lib/auth/internal', () => ({
verifyCronAuth: vi.fn().mockReturnValue(null),
}))
vi.doMock('@/background/schedule-execution', () => ({
executeScheduleJob: vi.fn().mockResolvedValue(undefined),
}))
vi.doMock('@/lib/env', () => ({
env: {
TRIGGER_DEV_ENABLED: false,
},
isTruthy: vi.fn(() => false),
}))
vi.doMock('drizzle-orm', () => ({
and: vi.fn((...conditions) => ({ type: 'and', conditions })),
eq: vi.fn((field, value) => ({ field, value, type: 'eq' })),
lte: vi.fn((field, value) => ({ field, value, type: 'lte' })),
not: vi.fn((condition) => ({ type: 'not', condition })),
}))
vi.doMock('@sim/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => [
{
id: 'schedule-1',
workflowId: 'workflow-1',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
{
id: 'schedule-2',
workflowId: 'workflow-2',
blockId: null,
cronExpression: null,
lastRanAt: null,
failedCount: 0,
},
]),
})),
})),
}
return {
db: mockDb,
workflowSchedule: {},
}
})
const { GET } = await import('@/app/api/schedules/execute/route')
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('executedCount', 2)
})
})

View File

@@ -1,673 +1,108 @@
import { db, userStats, workflow, workflowSchedule } from '@sim/db'
import { Cron } from 'croner'
import { and, eq, lte, not, sql } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { db, workflowSchedule } from '@sim/db'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, lte, not } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { env, isTruthy } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
type BlockState,
calculateNextRunTime as calculateNextTime,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { generateRequestId } from '@/lib/utils'
import { executeScheduleJob } from '@/background/schedule-execution'
export const dynamic = 'force-dynamic'
const logger = createLogger('ScheduledExecuteAPI')
// Maximum number of consecutive failures before disabling a schedule
const MAX_CONSECUTIVE_FAILURES = 3
export async function GET(request: NextRequest) {
const requestId = generateRequestId()
logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`)
/**
* Calculate the next run time for a schedule
* This is a wrapper around the utility function in schedule-utils.ts
*/
function calculateNextRunTime(
schedule: typeof workflowSchedule.$inferSelect,
blocks: Record<string, BlockState>
): Date {
// Look for either starter block or schedule trigger block
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
const authError = verifyCronAuth(request, 'Schedule execution')
if (authError) {
return authError
}
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
const EnvVarsSchema = z.record(z.string())
const runningExecutions = new Set<string>()
export async function GET() {
logger.info(`Scheduled execution triggered at ${new Date().toISOString()}`)
const requestId = generateRequestId()
const now = new Date()
let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = []
try {
dueSchedules = await db
const dueSchedules = await db
.select()
.from(workflowSchedule)
.where(
and(lte(workflowSchedule.nextRunAt, now), not(eq(workflowSchedule.status, 'disabled')))
)
.limit(10)
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
for (const schedule of dueSchedules) {
const executionId = uuidv4()
try {
if (runningExecutions.has(schedule.workflowId)) {
logger.debug(`[${requestId}] Skipping workflow ${schedule.workflowId} - already running`)
continue
}
runningExecutions.add(schedule.workflowId)
logger.debug(`[${requestId}] Starting execution of workflow ${schedule.workflowId}`)
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${schedule.workflowId} not found`)
runningExecutions.delete(schedule.workflowId)
continue
}
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.`
)
runningExecutions.delete(schedule.workflowId)
continue
}
// Check rate limits for scheduled execution (checks both personal and org subscriptions)
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'schedule',
false // schedules are always sync
)
if (!rateLimitCheck.allowed) {
logger.warn(
`[${requestId}] Rate limit exceeded for scheduled workflow ${schedule.workflowId}`,
{
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
}
)
// Retry in 5 minutes for rate limit
const retryDelay = 5 * 60 * 1000 // 5 minutes
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated next retry time due to rate limit`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError)
}
runningExecutions.delete(schedule.workflowId)
continue
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: schedule.workflowId,
}
)
try {
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, schedule.id))
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`,
calcErr
)
}
runningExecutions.delete(schedule.workflowId)
continue
}
// Execute scheduled workflow immediately (no queuing)
logger.info(`[${requestId}] Executing scheduled workflow ${schedule.workflowId}`)
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
const triggerPromises = dueSchedules.map(async (schedule) => {
try {
const executionSuccess = await (async () => {
// Create logging session inside the execution callback
const loggingSession = new LoggingSession(
schedule.workflowId,
executionId,
'schedule',
requestId
)
try {
logger.debug(`[${requestId}] Loading deployed workflow ${schedule.workflowId}`)
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
const blocks = deployedData.blocks
const edges = deployedData.edges
const loops = deployedData.loops
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${schedule.workflowId}`)
// Validate that the schedule's trigger block exists in the deployed state
if (schedule.blockId) {
const blockExists = await blockExistsInDeployment(
schedule.workflowId,
schedule.blockId
)
if (!blockExists) {
logger.warn(
`[${requestId}] Schedule trigger block ${schedule.blockId} not found in deployed workflow ${schedule.workflowId}. Skipping execution.`
)
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
const mergedStates = mergeSubblockState(blocks)
// Retrieve environment variables with workspace precedence
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
...personalEncrypted,
...workspaceEncrypted,
})
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
if (
typeof value === 'string' &&
value.includes('{{') &&
value.includes('}}')
) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting value for variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(
`[${requestId}] Failed to decrypt environment variable "${key}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${key}": ${error.message}`
)
}
}
// Process the block states to ensure response formats are properly parsed
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
// Check if this block has a responseFormat that needs to be parsed
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
const responseFormatValue = blockState.responseFormat.trim()
// Check for variable references like <start.input>
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
logger.debug(
`[${requestId}] Response format contains variable reference for block ${blockId}`
)
// Keep variable references as-is - they will be resolved during execution
acc[blockId] = blockState
} else if (responseFormatValue === '') {
// Empty string - remove response format
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
} else {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
// Attempt to parse the responseFormat if it's a string
const parsedResponseFormat = JSON.parse(responseFormatValue)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
error
)
// Set to undefined instead of keeping malformed JSON - this allows execution to continue
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
}
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
// Get workflow variables
let workflowVariables = {}
if (workflowRecord.variables) {
try {
if (typeof workflowRecord.variables === 'string') {
workflowVariables = JSON.parse(workflowRecord.variables)
} else {
workflowVariables = workflowRecord.variables
}
} catch (error) {
logger.error(`Failed to parse workflow variables: ${schedule.workflowId}`, error)
}
}
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true // Enable validation during execution
)
const input = {
_context: {
workflowId: schedule.workflowId,
},
}
// Start logging with environment variables
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
})
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflowRecord.workspaceId || '',
isDeployedContext: true,
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
const result = await executor.execute(
schedule.workflowId,
schedule.blockId || undefined
)
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
})
if (executionResult.success) {
await updateWorkflowRunCounts(schedule.workflowId)
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
// Complete logging
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: (traceSpans || []) as any,
})
return { success: executionResult.success, blocks, executionResult }
} catch (earlyError: any) {
// Handle errors that occur before workflow execution (e.g., missing data, env vars, etc.)
logger.error(
`[${requestId}] Early failure in scheduled workflow ${schedule.workflowId}`,
earlyError
)
// Create a minimal log entry for early failures
try {
await loggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for early schedule failure`,
loggingError
)
}
// Re-throw the error to be handled by the outer catch block
throw earlyError
}
})()
// Check if execution was skipped (e.g., trigger block not found)
if ('skip' in executionSuccess && executionSuccess.skip) {
runningExecutions.delete(schedule.workflowId)
continue
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${schedule.workflowId}`
)
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0, // Reset failure count on success
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(
`[${requestId}] Updated next run time for workflow ${schedule.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${schedule.workflowId} execution failed`)
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
// Handle sync queue overload
if (error.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000 // 5 minutes
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule retry time due to service overload`)
} catch (updateError) {
logger.error(
`[${requestId}] Error updating schedule for service overload:`,
updateError
)
}
} else {
logger.error(
`[${requestId}] Error executing scheduled workflow ${schedule.workflowId}`,
error
)
// Ensure we create a log entry for this failed execution
try {
const failureLoggingSession = new LoggingSession(
schedule.workflowId,
executionId,
'schedule',
requestId
)
await failureLoggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await failureLoggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for failed schedule execution`,
loggingError
)
}
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
try {
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
} catch {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) // 24 hours as a fallback
}
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(
`[${requestId}] Error updating schedule after execution error:`,
updateError
)
}
}
} finally {
runningExecutions.delete(schedule.workflowId)
const handle = await tasks.trigger('schedule-execution', payload)
logger.info(
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
)
return handle
} catch (error) {
logger.error(
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
error
)
return null
}
} catch (error: any) {
logger.error(`[${requestId}] Error in scheduled execution handler`, error)
return NextResponse.json({ error: error.message }, { status: 500 })
}
})
await Promise.allSettled(triggerPromises)
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
} else {
const directExecutionPromises = dueSchedules.map(async (schedule) => {
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: now.toISOString(),
}
void executeScheduleJob(payload).catch((error) => {
logger.error(
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
error
)
})
logger.info(
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
)
})
await Promise.allSettled(directExecutionPromises)
logger.info(
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
)
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
} catch (error: any) {
logger.error(`[${requestId}] Error in scheduled execution handler`, error)
return NextResponse.json({ error: error.message }, { status: 500 })
}
return NextResponse.json({
message: 'Scheduled workflow executions processed',
executedCount: dueSchedules.length,
})
}

View File

@@ -0,0 +1,598 @@
import { db, userStats, workflow, workflowSchedule } from '@sim/db'
import { task } from '@trigger.dev/sdk'
import { Cron } from 'croner'
import { eq, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
type BlockState,
calculateNextRunTime as calculateNextTime,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { decryptSecret } from '@/lib/utils'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { RateLimiter } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('TriggerScheduleExecution')
const MAX_CONSECUTIVE_FAILURES = 3
export type ScheduleExecutionPayload = {
scheduleId: string
workflowId: string
blockId?: string
cronExpression?: string
lastRanAt?: string
failedCount?: number
now: string
}
function calculateNextRunTime(
schedule: { cronExpression?: string; lastRanAt?: string },
blocks: Record<string, BlockState>
): Date {
const scheduleBlock = Object.values(blocks).find(
(block) => block.type === 'starter' || block.type === 'schedule'
)
if (!scheduleBlock) throw new Error('No starter or schedule block found')
const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(scheduleBlock)
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
if (!nextDate) throw new Error('Invalid cron expression or no future occurrences')
return nextDate
}
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
const executionId = uuidv4()
const requestId = executionId.slice(0, 8)
const now = new Date(payload.now)
logger.info(`[${requestId}] Starting schedule execution`, {
scheduleId: payload.scheduleId,
workflowId: payload.workflowId,
executionId,
})
const EnvVarsSchema = (await import('zod')).z.record((await import('zod')).z.string())
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`)
return
}
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${payload.scheduleId}: pinned API key required to attribute usage.`
)
return
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
actorUserId,
userSubscription,
'schedule',
false
)
if (!rateLimitCheck.allowed) {
logger.warn(
`[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`,
{
userId: workflowRecord.userId,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
}
)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated next retry time due to rate limit`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError)
}
return
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId: payload.workflowId,
}
)
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, payload.scheduleId))
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${payload.scheduleId}`,
calcErr
)
}
return
}
logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`)
const loggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
try {
const executionSuccess = await (async () => {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const blocks = deployedData.blocks
const edges = deployedData.edges
const loops = deployedData.loops
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${payload.workflowId}`)
if (payload.blockId) {
const blockExists = await blockExistsInDeployment(payload.workflowId, payload.blockId)
if (!blockExists) {
logger.warn(
`[${requestId}] Schedule trigger block ${payload.blockId} not found in deployed workflow ${payload.workflowId}. Skipping execution.`
)
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
const mergedStates = mergeSubblockState(blocks)
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
...personalEncrypted,
...workspaceEncrypted,
})
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
for (const match of matches) {
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting value for variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
const responseFormatValue = blockState.responseFormat.trim()
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
logger.debug(
`[${requestId}] Response format contains variable reference for block ${blockId}`
)
acc[blockId] = blockState
} else if (responseFormatValue === '') {
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
} else {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
const parsedResponseFormat = JSON.parse(responseFormatValue)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
error
)
acc[blockId] = {
...blockState,
responseFormat: undefined,
}
}
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
let workflowVariables = {}
if (workflowRecord.variables) {
try {
if (typeof workflowRecord.variables === 'string') {
workflowVariables = JSON.parse(workflowRecord.variables)
} else {
workflowVariables = workflowRecord.variables
}
} catch (error) {
logger.error(`Failed to parse workflow variables: ${payload.workflowId}`, error)
}
}
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true
)
const input = {
_context: {
workflowId: payload.workflowId,
},
}
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
})
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflowRecord.workspaceId || '',
isDeployedContext: true,
},
})
loggingSession.setupExecutor(executor)
const result = await executor.execute(payload.workflowId, payload.blockId || undefined)
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${payload.workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
})
if (executionResult.success) {
await updateWorkflowRunCounts(payload.workflowId)
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: executionResult.output || {},
traceSpans: (traceSpans || []) as any,
})
return { success: executionResult.success, blocks, executionResult }
} catch (earlyError: any) {
logger.error(
`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`,
earlyError
)
try {
await loggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await loggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for early schedule failure`,
loggingError
)
}
throw earlyError
}
})()
if ('skip' in executionSuccess && executionSuccess.skip) {
return
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${payload.workflowId} executed successfully`)
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${payload.workflowId}`
)
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(
`[${requestId}] Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${payload.workflowId} execution failed`)
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
if (error.message?.includes('Service overloaded')) {
logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`)
const retryDelay = 5 * 60 * 1000
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule retry time due to service overload`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for service overload:`, updateError)
}
} else {
logger.error(
`[${requestId}] Error executing scheduled workflow ${payload.workflowId}`,
error
)
try {
const failureLoggingSession = new LoggingSession(
payload.workflowId,
executionId,
'schedule',
requestId
)
await failureLoggingSession.safeStart({
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId || '',
variables: {},
})
await failureLoggingSession.safeCompleteWithError({
error: {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
`[${requestId}] Failed to create log entry for failed schedule execution`,
loggingError
)
}
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, payload.workflowId))
.limit(1)
if (workflowRecord?.isDeployed) {
try {
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any)
} catch {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
const newFailedCount = (payload.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, payload.scheduleId))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after execution error:`, updateError)
}
}
}
} catch (error: any) {
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
}
}
export const scheduleExecution = task({
id: 'schedule-execution',
retry: {
maxAttempts: 1,
},
run: async (payload: ScheduleExecutionPayload) => executeScheduleJob(payload),
})

View File

@@ -1,18 +1,29 @@
import * as yaml from 'js-yaml'
import { createLogger } from '@/lib/logs/console/logger'
import { getAccurateTokenCount } from '@/lib/tokenization'
import { estimateTokenCount } from '@/lib/tokenization/estimators'
import type { Chunk, ChunkerOptions } from './types'
const logger = createLogger('JsonYamlChunker')
function getTokenCount(text: string): number {
const estimate = estimateTokenCount(text)
return estimate.count
try {
return getAccurateTokenCount(text, 'text-embedding-3-small')
} catch (error) {
logger.warn('Tiktoken failed, falling back to estimation')
const estimate = estimateTokenCount(text)
return estimate.count
}
}
/**
* Configuration for JSON/YAML chunking
* Reduced limits to ensure we stay well under OpenAI's 8,191 token limit per embedding request
*/
const JSON_YAML_CHUNKING_CONFIG = {
TARGET_CHUNK_SIZE: 2000, // Target tokens per chunk
TARGET_CHUNK_SIZE: 1000, // Target tokens per chunk
MIN_CHUNK_SIZE: 100, // Minimum tokens per chunk
MAX_CHUNK_SIZE: 3000, // Maximum tokens per chunk
MAX_CHUNK_SIZE: 1500, // Maximum tokens per chunk
MAX_DEPTH_FOR_SPLITTING: 5, // Maximum depth to traverse for splitting
}
@@ -34,7 +45,6 @@ export class JsonYamlChunker {
return true
} catch {
try {
const yaml = require('js-yaml')
yaml.load(content)
return true
} catch {
@@ -48,9 +58,26 @@ export class JsonYamlChunker {
*/
async chunk(content: string): Promise<Chunk[]> {
try {
const data = JSON.parse(content)
return this.chunkStructuredData(data)
let data: any
try {
data = JSON.parse(content)
} catch {
data = yaml.load(content)
}
const chunks = this.chunkStructuredData(data)
const tokenCounts = chunks.map((c) => c.tokenCount)
const totalTokens = tokenCounts.reduce((a, b) => a + b, 0)
const maxTokens = Math.max(...tokenCounts)
const avgTokens = Math.round(totalTokens / chunks.length)
logger.info(
`JSON chunking complete: ${chunks.length} chunks, ${totalTokens} total tokens (avg: ${avgTokens}, max: ${maxTokens})`
)
return chunks
} catch (error) {
logger.info('JSON parsing failed, falling back to text chunking')
return this.chunkAsText(content)
}
}
@@ -102,7 +129,6 @@ export class JsonYamlChunker {
const itemTokens = getTokenCount(itemStr)
if (itemTokens > this.chunkSize) {
// Save current batch if it has items
if (currentBatch.length > 0) {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
@@ -134,7 +160,7 @@ export class JsonYamlChunker {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
text: batchContent,
tokenCount: currentTokens,
tokenCount: getTokenCount(batchContent),
metadata: {
startIndex: i - currentBatch.length,
endIndex: i - 1,
@@ -152,7 +178,7 @@ export class JsonYamlChunker {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
text: batchContent,
tokenCount: currentTokens,
tokenCount: getTokenCount(batchContent),
metadata: {
startIndex: arr.length - currentBatch.length,
endIndex: arr.length - 1,
@@ -194,12 +220,11 @@ export class JsonYamlChunker {
const valueTokens = getTokenCount(valueStr)
if (valueTokens > this.chunkSize) {
// Save current object if it has properties
if (Object.keys(currentObj).length > 0) {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
tokenCount: getTokenCount(objContent),
metadata: {
startIndex: 0,
endIndex: objContent.length,
@@ -230,7 +255,7 @@ export class JsonYamlChunker {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
tokenCount: getTokenCount(objContent),
metadata: {
startIndex: 0,
endIndex: objContent.length,
@@ -250,7 +275,7 @@ export class JsonYamlChunker {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
tokenCount: getTokenCount(objContent),
metadata: {
startIndex: 0,
endIndex: objContent.length,
@@ -262,7 +287,7 @@ export class JsonYamlChunker {
}
/**
* Fall back to text chunking if JSON parsing fails.
* Fall back to text chunking if JSON parsing fails
*/
private async chunkAsText(content: string): Promise<Chunk[]> {
const chunks: Chunk[] = []
@@ -308,7 +333,7 @@ export class JsonYamlChunker {
}
/**
* Static method for chunking JSON/YAML data with default options.
* Static method for chunking JSON/YAML data with default options
*/
static async chunkJsonYaml(content: string, options: ChunkerOptions = {}): Promise<Chunk[]> {
const chunker = new JsonYamlChunker(options)

View File

@@ -1,9 +1,12 @@
import { env } from '@/lib/env'
import { isRetryableError, retryWithExponentialBackoff } from '@/lib/knowledge/documents/utils'
import { createLogger } from '@/lib/logs/console/logger'
import { batchByTokenLimit, getTotalTokenCount } from '@/lib/tokenization'
const logger = createLogger('EmbeddingUtils')
const MAX_TOKENS_PER_REQUEST = 8000
export class EmbeddingAPIError extends Error {
public status: number
@@ -104,7 +107,8 @@ async function callEmbeddingAPI(inputs: string[], config: EmbeddingConfig): Prom
}
/**
* Generate embeddings for multiple texts with simple batching
* Generate embeddings for multiple texts with token-aware batching
* Uses tiktoken for token counting
*/
export async function generateEmbeddings(
texts: string[],
@@ -112,27 +116,45 @@ export async function generateEmbeddings(
): Promise<number[][]> {
const config = getEmbeddingConfig(embeddingModel)
logger.info(`Using ${config.useAzure ? 'Azure OpenAI' : 'OpenAI'} for embeddings generation`)
logger.info(
`Using ${config.useAzure ? 'Azure OpenAI' : 'OpenAI'} for embeddings generation (${texts.length} texts)`
)
const batches = batchByTokenLimit(texts, MAX_TOKENS_PER_REQUEST, embeddingModel)
logger.info(
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch)`
)
// Reduced batch size to prevent API timeouts and improve reliability
const batchSize = 50 // Reduced from 100 to prevent issues with large documents
const allEmbeddings: number[][] = []
for (let i = 0; i < texts.length; i += batchSize) {
const batch = texts.slice(i, i + batchSize)
const batchEmbeddings = await callEmbeddingAPI(batch, config)
allEmbeddings.push(...batchEmbeddings)
for (let i = 0; i < batches.length; i++) {
const batch = batches[i]
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
logger.info(
`Generated embeddings for batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(texts.length / batchSize)}`
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
)
// Add small delay between batches to avoid rate limiting
if (i + batchSize < texts.length) {
try {
const batchEmbeddings = await callEmbeddingAPI(batch, config)
allEmbeddings.push(...batchEmbeddings)
logger.info(
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
)
} catch (error) {
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
throw error
}
if (i + 1 < batches.length) {
await new Promise((resolve) => setTimeout(resolve, 100))
}
}
logger.info(`Successfully generated ${allEmbeddings.length} embeddings total`)
return allEmbeddings
}

View File

@@ -17,8 +17,8 @@ export function PostHogProvider({ children }: { children: React.ReactNode }) {
defaults: '2025-05-24',
person_profiles: 'identified_only',
capture_pageview: true,
capture_pageleave: true,
capture_performance: true,
capture_pageleave: false,
capture_performance: false,
session_recording: {
maskAllInputs: false,
maskInputOptions: {
@@ -26,13 +26,16 @@ export function PostHogProvider({ children }: { children: React.ReactNode }) {
email: false,
},
recordCrossOriginIframes: false,
recordHeaders: true,
recordBody: true,
recordHeaders: false,
recordBody: false,
},
autocapture: true,
capture_dead_clicks: true,
autocapture: {
dom_event_allowlist: ['click', 'submit', 'change'],
element_allowlist: ['button', 'a', 'input'],
},
capture_dead_clicks: false,
persistence: 'localStorage+cookie',
enable_heatmaps: true,
enable_heatmaps: false,
})
}
}, [])

View File

@@ -1,7 +1,8 @@
/**
* Token estimation functions for different providers
* Token estimation and accurate counting functions for different providers
*/
import { encodingForModel, type Tiktoken } from 'js-tiktoken'
import { createLogger } from '@/lib/logs/console/logger'
import { MIN_TEXT_LENGTH_FOR_ESTIMATION, TOKENIZATION_CONFIG } from '@/lib/tokenization/constants'
import type { TokenEstimate } from '@/lib/tokenization/types'
@@ -9,6 +10,160 @@ import { getProviderConfig } from '@/lib/tokenization/utils'
const logger = createLogger('TokenizationEstimators')
const encodingCache = new Map<string, Tiktoken>()
/**
* Get or create a cached encoding for a model
*/
function getEncoding(modelName: string): Tiktoken {
if (encodingCache.has(modelName)) {
return encodingCache.get(modelName)!
}
try {
const encoding = encodingForModel(modelName as Parameters<typeof encodingForModel>[0])
encodingCache.set(modelName, encoding)
return encoding
} catch (error) {
logger.warn(`Failed to get encoding for model ${modelName}, falling back to cl100k_base`)
const encoding = encodingForModel('gpt-4')
encodingCache.set(modelName, encoding)
return encoding
}
}
if (typeof process !== 'undefined') {
process.on('beforeExit', () => {
clearEncodingCache()
})
}
/**
* Get accurate token count for text using tiktoken
* This is the exact count OpenAI's API will use
*/
export function getAccurateTokenCount(text: string, modelName = 'text-embedding-3-small'): number {
if (!text || text.length === 0) {
return 0
}
try {
const encoding = getEncoding(modelName)
const tokens = encoding.encode(text)
return tokens.length
} catch (error) {
logger.error('Error counting tokens with tiktoken:', error)
return Math.ceil(text.length / 4)
}
}
/**
* Truncate text to a maximum token count
* Useful for handling texts that exceed model limits
*/
export function truncateToTokenLimit(
text: string,
maxTokens: number,
modelName = 'text-embedding-3-small'
): string {
if (!text || maxTokens <= 0) {
return ''
}
try {
const encoding = getEncoding(modelName)
const tokens = encoding.encode(text)
if (tokens.length <= maxTokens) {
return text
}
const truncatedTokens = tokens.slice(0, maxTokens)
const truncatedText = encoding.decode(truncatedTokens)
logger.warn(
`Truncated text from ${tokens.length} to ${maxTokens} tokens (${text.length} to ${truncatedText.length} chars)`
)
return truncatedText
} catch (error) {
logger.error('Error truncating text:', error)
const maxChars = maxTokens * 4
return text.slice(0, maxChars)
}
}
/**
* Get token count for multiple texts (for batching decisions)
* Returns array of token counts in same order as input
*/
export function getTokenCountsForBatch(
texts: string[],
modelName = 'text-embedding-3-small'
): number[] {
return texts.map((text) => getAccurateTokenCount(text, modelName))
}
/**
* Calculate total tokens across multiple texts
*/
export function getTotalTokenCount(texts: string[], modelName = 'text-embedding-3-small'): number {
return texts.reduce((total, text) => total + getAccurateTokenCount(text, modelName), 0)
}
/**
* Batch texts by token count to stay within API limits
* Returns array of batches where each batch's total tokens <= maxTokensPerBatch
*/
export function batchByTokenLimit(
texts: string[],
maxTokensPerBatch: number,
modelName = 'text-embedding-3-small'
): string[][] {
const batches: string[][] = []
let currentBatch: string[] = []
let currentTokenCount = 0
for (const text of texts) {
const tokenCount = getAccurateTokenCount(text, modelName)
if (tokenCount > maxTokensPerBatch) {
if (currentBatch.length > 0) {
batches.push(currentBatch)
currentBatch = []
currentTokenCount = 0
}
const truncated = truncateToTokenLimit(text, maxTokensPerBatch, modelName)
batches.push([truncated])
continue
}
if (currentBatch.length > 0 && currentTokenCount + tokenCount > maxTokensPerBatch) {
batches.push(currentBatch)
currentBatch = [text]
currentTokenCount = tokenCount
} else {
currentBatch.push(text)
currentTokenCount += tokenCount
}
}
if (currentBatch.length > 0) {
batches.push(currentBatch)
}
return batches
}
/**
* Clean up cached encodings (call when shutting down)
*/
export function clearEncodingCache(): void {
encodingCache.clear()
logger.info('Cleared tiktoken encoding cache')
}
/**
* Estimates token count for text using provider-specific heuristics
*/
@@ -60,7 +215,6 @@ function estimateOpenAITokens(text: string): number {
for (const word of words) {
if (word.length === 0) continue
// GPT tokenizer characteristics based on BPE
if (word.length <= 4) {
tokenCount += 1
} else if (word.length <= 8) {
@@ -69,12 +223,10 @@ function estimateOpenAITokens(text: string): number {
tokenCount += Math.ceil(word.length / 4)
}
// Add extra tokens for punctuation
const punctuationCount = (word.match(/[.,!?;:"'()[\]{}<>]/g) || []).length
tokenCount += punctuationCount * 0.5
}
// Add tokens for newlines and formatting
const newlineCount = (text.match(/\n/g) || []).length
tokenCount += newlineCount * 0.5
@@ -91,7 +243,6 @@ function estimateAnthropicTokens(text: string): number {
for (const word of words) {
if (word.length === 0) continue
// Claude tokenizer tends to be slightly more efficient
if (word.length <= 4) {
tokenCount += 1
} else if (word.length <= 8) {
@@ -101,7 +252,6 @@ function estimateAnthropicTokens(text: string): number {
}
}
// Claude handles formatting slightly better
const newlineCount = (text.match(/\n/g) || []).length
tokenCount += newlineCount * 0.3
@@ -118,7 +268,6 @@ function estimateGoogleTokens(text: string): number {
for (const word of words) {
if (word.length === 0) continue
// Gemini tokenizer characteristics
if (word.length <= 5) {
tokenCount += 1
} else if (word.length <= 10) {

View File

@@ -6,9 +6,15 @@ export {
export { LLM_BLOCK_TYPES, TOKENIZATION_CONFIG } from '@/lib/tokenization/constants'
export { createTokenizationError, TokenizationError } from '@/lib/tokenization/errors'
export {
batchByTokenLimit,
clearEncodingCache,
estimateInputTokens,
estimateOutputTokens,
estimateTokenCount,
getAccurateTokenCount,
getTokenCountsForBatch,
getTotalTokenCount,
truncateToTokenLimit,
} from '@/lib/tokenization/estimators'
export { processStreamingBlockLog, processStreamingBlockLogs } from '@/lib/tokenization/streaming'
export type {

View File

@@ -220,13 +220,13 @@ const nextConfig: NextConfig = {
if (isHosted) {
redirects.push(
{
source: '/((?!api|_next|_vercel|favicon|static|.*\\..*).*)',
source: '/((?!api|_next|_vercel|favicon|static|ingest|.*\\..*).*)',
destination: 'https://www.sim.ai/$1',
permanent: true,
has: [{ type: 'host' as const, value: 'simstudio.ai' }],
},
{
source: '/((?!api|_next|_vercel|favicon|static|.*\\..*).*)',
source: '/((?!api|_next|_vercel|favicon|static|ingest|.*\\..*).*)',
destination: 'https://www.sim.ai/$1',
permanent: true,
has: [{ type: 'host' as const, value: 'www.simstudio.ai' }],

View File

@@ -82,6 +82,7 @@
"input-otp": "^1.4.2",
"ioredis": "^5.6.0",
"jose": "6.0.11",
"js-tiktoken": "1.0.21",
"js-yaml": "4.1.0",
"jwt-decode": "^4.0.0",
"lucide-react": "^0.479.0",
@@ -119,6 +120,7 @@
},
"devDependencies": {
"@testing-library/jest-dom": "^6.6.3",
"@trigger.dev/build": "4.0.4",
"@types/html-to-text": "9.0.4",
"@types/js-yaml": "4.0.9",
"@types/jsdom": "21.1.7",

View File

@@ -1,3 +1,4 @@
import { additionalPackages } from '@trigger.dev/build/extensions/core'
import { defineConfig } from '@trigger.dev/sdk'
import { env } from './lib/env'
@@ -13,4 +14,12 @@ export default defineConfig({
},
},
dirs: ['./background'],
build: {
extensions: [
// pdf-parse has native bindings, keep as external package
additionalPackages({
packages: ['pdf-parse'],
}),
],
},
})

View File

@@ -116,6 +116,7 @@
"input-otp": "^1.4.2",
"ioredis": "^5.6.0",
"jose": "6.0.11",
"js-tiktoken": "1.0.21",
"js-yaml": "4.1.0",
"jwt-decode": "^4.0.0",
"lucide-react": "^0.479.0",
@@ -153,6 +154,7 @@
},
"devDependencies": {
"@testing-library/jest-dom": "^6.6.3",
"@trigger.dev/build": "4.0.4",
"@types/html-to-text": "9.0.4",
"@types/js-yaml": "4.0.9",
"@types/jsdom": "21.1.7",
@@ -1217,6 +1219,8 @@
"@tokenizer/token": ["@tokenizer/token@0.3.0", "", {}, "sha512-OvjF+z51L3ov0OyAU0duzsYuvO01PH7x4t6DJx+guahgTnBHkhJdG7soQeTSFLWN3efnHyibZ4Z8l2EuWwJN3A=="],
"@trigger.dev/build": ["@trigger.dev/build@4.0.4", "", { "dependencies": { "@trigger.dev/core": "4.0.4", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-W3mP+RBkcYOrNYTTmQ/WdU6LB+2Tk1S6r3OjEWqXEPsXLEEw6BzHTHZBirHYX4lWRBL9jVkL+/H74ycyNfzRjg=="],
"@trigger.dev/core": ["@trigger.dev/core@4.0.4", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-c5myttkNhqaqvLlEz3ttE1qEsULlD6ILBge5FAfEtMv9HVS/pNlgvMKrdFMefaGO/bE4HoxrNGdJsY683Kq32w=="],
"@trigger.dev/sdk": ["@trigger.dev/sdk@4.0.4", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.4", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-54krRw9SN1CGm5u17JBzu0hNzRf1u37jKbSFFngPJjUOltOgi/owey5+KNu1rGthabhOBK2VKzvKEd4sn08RCA=="],
@@ -1611,7 +1615,7 @@
"concurrently": ["concurrently@9.2.1", "", { "dependencies": { "chalk": "4.1.2", "rxjs": "7.8.2", "shell-quote": "1.8.3", "supports-color": "8.1.1", "tree-kill": "1.2.2", "yargs": "17.7.2" }, "bin": { "conc": "dist/bin/concurrently.js", "concurrently": "dist/bin/concurrently.js" } }, "sha512-fsfrO0MxV64Znoy8/l1vVIjjHa29SZyyqPgQBwhiDcaW8wJc2W3XWVOGx4M3oJBnv/zdUZIIp1gDeS98GzP8Ng=="],
"confbox": ["confbox@0.2.2", "", {}, "sha512-1NB+BKqhtNipMsov4xI/NnhCKp9XG9NamYp5PVm9klAT0fsrNPjaFICsCFhNhwZJKNh7zB/3q8qXz0E9oaMNtQ=="],
"confbox": ["confbox@0.1.8", "", {}, "sha512-RMtmw0iFkeR4YV+fUOSucriAQNb9g8zFR52MWCtl+cCZOFRNL6zeB395vPzFhEjjn4fMxXudmELnl/KF/WrK6w=="],
"consola": ["consola@3.4.2", "", {}, "sha512-5IKcdX0nnYavi6G7TtOhwkYzyjfJlatbjMjuLSfE2kYT5pMDOilZ4OvMhi637CcDICTmz3wARPoyhqyX1Y+XvA=="],
@@ -2115,6 +2119,8 @@
"joycon": ["joycon@3.1.1", "", {}, "sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw=="],
"js-tiktoken": ["js-tiktoken@1.0.21", "", { "dependencies": { "base64-js": "^1.5.1" } }, "sha512-biOj/6M5qdgx5TKjDnFT1ymSpM5tbd3ylwDtrQvFQSu0Z7bBYko2dF+W/aUkXUPuk6IVpRxk/3Q2sHOzGlS36g=="],
"js-tokens": ["js-tokens@9.0.1", "", {}, "sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ=="],
"js-yaml": ["js-yaml@4.1.0", "", { "dependencies": { "argparse": "^2.0.1" }, "bin": { "js-yaml": "bin/js-yaml.js" } }, "sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA=="],
@@ -2383,6 +2389,8 @@
"minizlib": ["minizlib@3.1.0", "", { "dependencies": { "minipass": "^7.1.2" } }, "sha512-KZxYo1BUkWD2TVFLr0MQoM8vUUigWD3LlD83a/75BqC+4qE0Hb1Vo5v1FgcfaNXvfXzr+5EhQ6ing/CaBijTlw=="],
"mlly": ["mlly@1.8.0", "", { "dependencies": { "acorn": "^8.15.0", "pathe": "^2.0.3", "pkg-types": "^1.3.1", "ufo": "^1.6.1" } }, "sha512-l8D9ODSRWLe2KHJSifWGwBqpTZXIXTeo8mlKjY+E2HAakaTeNpqAyBZ8GSqLzHgw4XmHmC8whvpjJNMbFZN7/g=="],
"module-details-from-path": ["module-details-from-path@1.0.4", "", {}, "sha512-EGWKgxALGMgzvxYF1UyGTy0HXX/2vHLkw6+NvDKW2jypWbHpjQuj4UMcqQWXHERJhVGKikolT06G3bcKe4fi7w=="],
"mongodb": ["mongodb@6.19.0", "", { "dependencies": { "@mongodb-js/saslprep": "^1.1.9", "bson": "^6.10.4", "mongodb-connection-string-url": "^3.0.0" }, "peerDependencies": { "@aws-sdk/credential-providers": "^3.188.0", "@mongodb-js/zstd": "^1.1.0 || ^2.0.0", "gcp-metadata": "^5.2.0", "kerberos": "^2.0.1", "mongodb-client-encryption": ">=6.0.0 <7", "snappy": "^7.3.2", "socks": "^2.7.1" }, "optionalPeers": ["@aws-sdk/credential-providers", "@mongodb-js/zstd", "gcp-metadata", "kerberos", "mongodb-client-encryption", "snappy", "socks"] }, "sha512-H3GtYujOJdeKIMLKBT9PwlDhGrQfplABNF1G904w6r5ZXKWyv77aB0X9B+rhmaAwjtllHzaEkvi9mkGVZxs2Bw=="],
@@ -2547,7 +2555,7 @@
"pkce-challenge": ["pkce-challenge@5.0.0", "", {}, "sha512-ueGLflrrnvwB3xuo/uGob5pd5FN7l0MsLf0Z87o/UQmRtwjvfylfc9MurIxRAWywCYTgrvpXBcqjV4OfCYGCIQ=="],
"pkg-types": ["pkg-types@2.3.0", "", { "dependencies": { "confbox": "^0.2.2", "exsolve": "^1.0.7", "pathe": "^2.0.3" } }, "sha512-SIqCzDRg0s9npO5XQ3tNZioRY1uK06lA41ynBC1YmFTmnY6FjUjVt6s4LoADmwoig1qqD0oK8h1p/8mlMx8Oig=="],
"pkg-types": ["pkg-types@1.3.1", "", { "dependencies": { "confbox": "^0.1.8", "mlly": "^1.7.4", "pathe": "^2.0.1" } }, "sha512-/Jm5M4RvtBFVkKWRu2BLUTNP8/M2a+UwuAX+ae4770q1qVGtfjG+WTCupoZixokjmHiry8uI+dlY8KXYV5HVVQ=="],
"platform": ["platform@1.3.6", "", {}, "sha512-fnWVljUchTro6RiCFvCXBbNhJc2NijN7oIQxbwsyL0buWJPG85v81ehlHI9fXrJsMNgTofEoWIQeClKpgxFLrg=="],
@@ -2959,7 +2967,7 @@
"tsafe": ["tsafe@1.8.10", "", {}, "sha512-2bBiNHk6Ts4LZQ4+6OxF/BtkJ8YWqo1VMbMo6qrRIZoqAwM8xuwWUx9g3C/p6cCdUmNWeOWIaiJzgO5zWy1Cdg=="],
"tsconfck": ["tsconfck@3.1.6", "", { "peerDependencies": { "typescript": "^5.0.0" }, "optionalPeers": ["typescript"], "bin": { "tsconfck": "bin/tsconfck.js" } }, "sha512-ks6Vjr/jEw0P1gmOVwutM3B7fWxoWBL2KRDb1JfqGVawBmO5UsvmWOQFGHBPl5yxYz4eERr19E6L7NMv+Fej4w=="],
"tsconfck": ["tsconfck@3.1.3", "", { "peerDependencies": { "typescript": "^5.0.0" }, "optionalPeers": ["typescript"], "bin": { "tsconfck": "bin/tsconfck.js" } }, "sha512-ulNZP1SVpRDesxeMLON/LtWM8HIgAJEIVpVVhBM6gsmvQ8+Rh+ZG7FWGvHh7Ah3pRABwVJWklWCr/BTZSv0xnQ=="],
"tsconfig-paths": ["tsconfig-paths@4.2.0", "", { "dependencies": { "json5": "^2.2.2", "minimist": "^1.2.6", "strip-bom": "^3.0.0" } }, "sha512-NoZ4roiN7LnbKn9QqE1amc9DJfzvZXxF4xDavcOWt1BPkdx+m+0gJuPM+S0vCe7zTJMYUP0R8pO2XMr+Y8oLIg=="],
@@ -2991,6 +2999,8 @@
"typescript": ["typescript@5.9.2", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-CWBzXQrc/qOkhidw1OzBTQuYRbfyxDXJMVJ1XNwUHGROVmuaeiEm3OslpZ1RV96d7SKKjZKrSJu3+t/xlw3R9A=="],
"ufo": ["ufo@1.6.1", "", {}, "sha512-9a4/uxlTWJ4+a5i0ooc1rU7C7YOw3wT+UGqdeNNHWnOF9qcMBgLRS+4IYUqbczewFx4mLEig6gawh7X6mFlEkA=="],
"ulid": ["ulid@2.4.0", "", { "bin": { "ulid": "bin/cli.js" } }, "sha512-fIRiVTJNcSRmXKPZtGzFQv9WRrZ3M9eoptl/teFJvjOzmpU+/K/JH6HZ8deBfb5vMEpicJcLn7JmvdknlMq7Zg=="],
"uncrypto": ["uncrypto@0.1.3", "", {}, "sha512-Ql87qFHB3s/De2ClA9e0gsnS6zXG27SkTiSJwjCc9MebbfapQfuPzumMIUMi38ezPZVNFcHI9sUIepeQfw8J8Q=="],
@@ -3495,6 +3505,8 @@
"npm-run-path/path-key": ["path-key@4.0.0", "", {}, "sha512-haREypq7xkM7ErfgIyA0z+Bj4AGKlMSdlQE2jvJo6huWD1EdkKYV+G/T4nq0YEF2vgTT8kqMFKo1uHn950r4SQ=="],
"nypm/pkg-types": ["pkg-types@2.3.0", "", { "dependencies": { "confbox": "^0.2.2", "exsolve": "^1.0.7", "pathe": "^2.0.3" } }, "sha512-SIqCzDRg0s9npO5XQ3tNZioRY1uK06lA41ynBC1YmFTmnY6FjUjVt6s4LoADmwoig1qqD0oK8h1p/8mlMx8Oig=="],
"nypm/tinyexec": ["tinyexec@0.3.2", "", {}, "sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA=="],
"oauth2-mock-server/jose": ["jose@5.10.0", "", {}, "sha512-s+3Al/p9g32Iq+oqXxkW//7jk2Vig6FF1CFqzVXoTUXt2qz89YWbL+OwS17NFYEvxC35n0FKeGO2LGYSxeM2Gg=="],
@@ -3595,6 +3607,8 @@
"unicode-trie/pako": ["pako@0.2.9", "", {}, "sha512-NUcwaKxUxWrZLpDG+z/xZaCgQITkA/Dv4V/T6bw7VON6l1Xz/VnrBqrYjZQ12TamKHzITTfOEIYUj48y2KXImA=="],
"vite-tsconfig-paths/tsconfck": ["tsconfck@3.1.6", "", { "peerDependencies": { "typescript": "^5.0.0" }, "optionalPeers": ["typescript"], "bin": { "tsconfck": "bin/tsconfck.js" } }, "sha512-ks6Vjr/jEw0P1gmOVwutM3B7fWxoWBL2KRDb1JfqGVawBmO5UsvmWOQFGHBPl5yxYz4eERr19E6L7NMv+Fej4w=="],
"vitest/tinyexec": ["tinyexec@0.3.2", "", {}, "sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA=="],
"xml-crypto/xpath": ["xpath@0.0.33", "", {}, "sha512-NNXnzrkDrAzalLhIUc01jO2mOzXGXh1JwPgkihcLLzw98c0WgYDmmjSh1Kl3wzaxSVWMuA+fe0WTWOBDWCBmNA=="],
@@ -3793,6 +3807,8 @@
"log-update/wrap-ansi/string-width": ["string-width@5.1.2", "", { "dependencies": { "eastasianwidth": "^0.2.0", "emoji-regex": "^9.2.2", "strip-ansi": "^7.0.1" } }, "sha512-HnLOCR3vjcY8beoNLtcjZ5/nxn2afmME6lhrDrebokqMap+XbeW8n9TXpPDOqdGK5qcI3oT0GKTW6wC7EMiVqA=="],
"nypm/pkg-types/confbox": ["confbox@0.2.2", "", {}, "sha512-1NB+BKqhtNipMsov4xI/NnhCKp9XG9NamYp5PVm9klAT0fsrNPjaFICsCFhNhwZJKNh7zB/3q8qXz0E9oaMNtQ=="],
"openai/@types/node/undici-types": ["undici-types@5.26.5", "", {}, "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="],
"openai/node-fetch/whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="],
@@ -3943,4 +3959,4 @@
"lint-staged/listr2/log-update/cli-cursor/restore-cursor/onetime": ["onetime@7.0.0", "", { "dependencies": { "mimic-function": "^5.0.0" } }, "sha512-VXJjc87FScF88uafS3JllDgvAm+c/Slfz06lorj2uAY34rlUu0Nt+v8wreiImcrgAjjIHp1rXpTDlLOGw29WwQ=="],
}
}
}

View File

@@ -200,8 +200,9 @@ ingress:
# Pod disruption budget for high availability
podDisruptionBudget:
enabled: true
minAvailable: 1
minAvailable: null
maxUnavailable: 1
unhealthyPodEvictionPolicy: AlwaysAllow
# Network policies
networkPolicy:
enabled: true

View File

@@ -122,8 +122,9 @@ autoscaling:
podDisruptionBudget:
enabled: true
minAvailable: 1
minAvailable: null
maxUnavailable: 1
unhealthyPodEvictionPolicy: AlwaysAllow
monitoring:
serviceMonitor:
enabled: true

View File

@@ -201,8 +201,9 @@ ingress:
# Pod disruption budget for high availability
podDisruptionBudget:
enabled: true
minAvailable: 1
minAvailable: null
maxUnavailable: 1
unhealthyPodEvictionPolicy: AlwaysAllow
# Network policies
networkPolicy:
enabled: true

View File

@@ -165,7 +165,9 @@ autoscaling:
# Pod disruption budget (ensures minimum availability during cluster maintenance)
podDisruptionBudget:
enabled: true
minAvailable: 1
minAvailable: null
maxUnavailable: 1
unhealthyPodEvictionPolicy: AlwaysAllow
# Monitoring integration with Prometheus
monitoring:

View File

@@ -0,0 +1,35 @@
{{- if and .Values.postgresql.enabled .Values.postgresql.tls.enabled }}
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: {{ include "sim.fullname" . }}-postgresql-tls-certificate
namespace: {{ .Release.Namespace }}
labels:
{{- include "sim.postgresql.labels" . | nindent 4 }}
spec:
secretName: {{ .Values.postgresql.tls.certificatesSecret }}
duration: {{ .Values.postgresql.tls.duration | default "87600h" }} # Default: 10 years
renewBefore: {{ .Values.postgresql.tls.renewBefore | default "2160h" }} # Default: 90 days before expiry
isCA: false
{{- if .Values.postgresql.tls.rotationPolicy }}
rotationPolicy: {{ .Values.postgresql.tls.rotationPolicy }}
{{- end }}
privateKey:
algorithm: {{ .Values.postgresql.tls.privateKey.algorithm | default "RSA" }}
size: {{ .Values.postgresql.tls.privateKey.size | default 4096 }}
usages:
- server auth
- client auth
dnsNames:
- {{ include "sim.fullname" . }}-postgresql
- {{ include "sim.fullname" . }}-postgresql.{{ .Release.Namespace }}.svc.cluster.local
{{- with .Values.postgresql.tls.additionalDnsNames }}
{{- toYaml . | nindent 2 }}
{{- end }}
issuerRef:
name: {{ .Values.postgresql.tls.issuerRef.name }}
kind: {{ .Values.postgresql.tls.issuerRef.kind | default "ClusterIssuer" }}
{{- if .Values.postgresql.tls.issuerRef.group }}
group: {{ .Values.postgresql.tls.issuerRef.group }}
{{- end }}
{{- end }}

View File

@@ -41,6 +41,9 @@ spec:
securityContext:
{{- toYaml . | nindent 14 }}
{{- end }}
env:
- name: CRON_SECRET
value: {{ $.Values.app.env.CRON_SECRET | quote }}
command:
- /bin/sh
- -c
@@ -58,6 +61,7 @@ spec:
if curl -f -s -S --max-time 60 --retry 2 --retry-delay 5 \
-H "Content-Type: application/json" \
-H "User-Agent: Kubernetes-CronJob/{{ $jobConfig.name }}" \
-H "Authorization: Bearer ${CRON_SECRET}" \
"$SERVICE_URL{{ $jobConfig.path }}"; then
echo "Success: HTTP request completed"
exit 0

View File

@@ -0,0 +1,52 @@
{{- if and .Values.podDisruptionBudget.enabled .Values.app.enabled }}
{{- with .Values.podDisruptionBudget }}
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ include "sim.fullname" $ }}-app-pdb
namespace: {{ $.Release.Namespace }}
labels:
{{- include "sim.app.labels" $ | nindent 4 }}
spec:
{{- if .minAvailable }}
minAvailable: {{ .minAvailable }}
{{- else if .maxUnavailable }}
maxUnavailable: {{ .maxUnavailable }}
{{- else }}
maxUnavailable: 1
{{- end }}
{{- if .unhealthyPodEvictionPolicy }}
unhealthyPodEvictionPolicy: {{ .unhealthyPodEvictionPolicy }}
{{- end }}
selector:
matchLabels:
{{- include "sim.app.selectorLabels" $ | nindent 6 }}
{{- end }}
{{- end }}
{{- if and .Values.podDisruptionBudget.enabled .Values.realtime.enabled }}
{{- with .Values.podDisruptionBudget }}
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: {{ include "sim.fullname" $ }}-realtime-pdb
namespace: {{ $.Release.Namespace }}
labels:
{{- include "sim.realtime.labels" $ | nindent 4 }}
spec:
{{- if .minAvailable }}
minAvailable: {{ .minAvailable }}
{{- else if .maxUnavailable }}
maxUnavailable: {{ .maxUnavailable }}
{{- else }}
maxUnavailable: 1
{{- end }}
{{- if .unhealthyPodEvictionPolicy }}
unhealthyPodEvictionPolicy: {{ .unhealthyPodEvictionPolicy }}
{{- end }}
selector:
matchLabels:
{{- include "sim.realtime.selectorLabels" $ | nindent 6 }}
{{- end }}
{{- end }}

View File

@@ -290,9 +290,28 @@ postgresql:
- ReadWriteOnce
# SSL/TLS configuration (enable for production deployments with certificates)
# Requires cert-manager to be installed in the cluster
tls:
enabled: false
certificatesSecret: postgres-tls-secret
# Certificate configuration (only used if enabled)
duration: "87600h" # 10 years (default)
renewBefore: "2160h" # Renew 90 days before expiry (default)
rotationPolicy: "" # Set to "Always" to rotate private key on renewal (recommended for security)
privateKey:
algorithm: RSA # RSA or ECDSA
size: 4096 # Key size in bits
# Issuer reference (REQUIRED if tls.enabled is true)
issuerRef:
name: selfsigned-cluster-issuer # Name of your cert-manager Issuer/ClusterIssuer
kind: ClusterIssuer # ClusterIssuer or Issuer
group: "" # Optional: cert-manager.io (leave empty for default)
# Additional DNS names (optional)
additionalDnsNames: []
# Example:
# additionalDnsNames:
# - postgres.example.com
# - db.example.com
# PostgreSQL configuration
config:
@@ -495,9 +514,20 @@ autoscaling:
behavior: {}
# Pod disruption budget
# Note: PDBs only protect against voluntary disruptions (node drains, autoscaler)
# They do NOT affect rolling updates - use deployment.strategy.rollingUpdate for that
podDisruptionBudget:
enabled: false
minAvailable: 1
# Use either minAvailable or maxUnavailable (not both)
# Recommendation: Use maxUnavailable as it scales better with HPA
# - minAvailable: minimum pods that must remain available (e.g., 1, "50%")
# - maxUnavailable: maximum pods that can be unavailable (e.g., 1, "25%")
minAvailable: null
maxUnavailable: 1
# unhealthyPodEvictionPolicy: allows eviction of unhealthy pods during node drains
# Options: IfHealthyBudget (default) | AlwaysAllow (recommended for production)
# Set to null to use K8s default (IfHealthyBudget)
unhealthyPodEvictionPolicy: null
# Monitoring configuration
monitoring: