fix(schedules): disable schedules after consecutive failures (#368)

* fix(schedules): disable schedules after consecutive failures

* acknowledged PR comments
This commit is contained in:
Waleed Latif
2025-05-17 01:18:11 -07:00
committed by GitHub
parent 0b055623a0
commit e57d3f79a1
16 changed files with 2930 additions and 420 deletions

View File

@@ -319,3 +319,111 @@ export async function getMockedDependencies() {
db: dbModule.db,
}
}
export function mockScheduleStatusDb({
schedule = [
{
id: 'schedule-id',
workflowId: 'workflow-id',
status: 'active',
failedCount: 0,
lastRanAt: new Date('2024-01-01T00:00:00.000Z'),
lastFailedAt: null,
nextRunAt: new Date('2024-01-02T00:00:00.000Z'),
},
],
workflow = [
{
userId: 'user-id',
},
],
}: {
schedule?: any[]
workflow?: any[]
} = {}) {
vi.doMock('@/db', () => {
let callCount = 0
const select = vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation(() => ({
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => {
callCount += 1
if (callCount === 1) return schedule
if (callCount === 2) return workflow
return []
}),
})),
})),
}))
return {
db: { select },
}
})
}
export function mockScheduleExecuteDb({
schedules = [] as any[],
workflowRecord = {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
envRecord = {
userId: 'user-id',
variables: {
OPENAI_API_KEY: 'encrypted:openai-api-key',
SERPER_API_KEY: 'encrypted:serper-api-key',
},
},
}: {
schedules?: any[]
workflowRecord?: any
envRecord?: any
}): void {
vi.doMock('@/db', () => {
const select = vi.fn().mockImplementation(() => ({
from: vi.fn().mockImplementation((table: any) => {
const tbl = String(table)
if (tbl === 'workflow_schedule' || tbl === 'schedule') {
return {
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => schedules),
})),
}
}
if (tbl === 'workflow') {
return {
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => [workflowRecord]),
})),
}
}
if (tbl === 'environment') {
return {
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => [envRecord]),
})),
}
}
return {
where: vi.fn().mockImplementation(() => ({
limit: vi.fn().mockImplementation(() => []),
})),
}
}),
}))
const update = vi.fn().mockImplementation(() => ({
set: vi.fn().mockImplementation(() => ({
where: vi.fn().mockResolvedValue([]),
})),
}))
return { db: { select, update } }
})
}

View File

@@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { workflow, workflowSchedule } from '@/db/schema'
const logger = createLogger('ScheduleDeleteAPI')
const logger = createLogger('ScheduleAPI')
export const dynamic = 'force-dynamic'
@@ -63,3 +63,88 @@ export async function DELETE(
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
/**
* Update a schedule - can be used to reactivate a disabled schedule
*/
export async function PUT(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
const { id } = await params
const scheduleId = id
logger.debug(`[${requestId}] Updating schedule with ID: ${scheduleId}`)
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized schedule update attempt`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = await request.json()
const { action } = body
const [schedule] = await db
.select({
id: workflowSchedule.id,
workflowId: workflowSchedule.workflowId,
status: workflowSchedule.status,
})
.from(workflowSchedule)
.where(eq(workflowSchedule.id, scheduleId))
.limit(1)
if (!schedule) {
logger.warn(`[${requestId}] Schedule not found: ${scheduleId}`)
return NextResponse.json({ error: 'Schedule not found' }, { status: 404 })
}
const [workflowRecord] = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow not found for schedule: ${scheduleId}`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
if (workflowRecord.userId !== session.user.id) {
logger.warn(`[${requestId}] User not authorized to modify this schedule: ${scheduleId}`)
return NextResponse.json({ error: 'Not authorized to modify this schedule' }, { status: 403 })
}
if (action === 'reactivate' || (body.status && body.status === 'active')) {
if (schedule.status === 'active') {
return NextResponse.json({ message: 'Schedule is already active' }, { status: 200 })
}
const now = new Date()
const nextRunAt = new Date(now.getTime() + 60 * 1000) // Schedule to run in 1 minute
await db
.update(workflowSchedule)
.set({
status: 'active',
failedCount: 0,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, scheduleId))
logger.info(`[${requestId}] Reactivated schedule: ${scheduleId}`)
return NextResponse.json({
message: 'Schedule activated successfully',
nextRunAt,
})
}
logger.warn(`[${requestId}] Unsupported update action for schedule: ${scheduleId}`)
return NextResponse.json({ error: 'Unsupported update action' }, { status: 400 })
} catch (error) {
logger.error(`[${requestId}] Error updating schedule`, error)
return NextResponse.json({ error: 'Failed to update schedule' }, { status: 500 })
}
}

View File

@@ -0,0 +1,143 @@
/**
* Integration tests for schedule status API route
*
* @vitest-environment node
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createMockRequest, mockScheduleStatusDb } from '@/app/api/__test-utils__/utils'
// Common mocks
const mockSchedule = {
id: 'schedule-id',
workflowId: 'workflow-id',
status: 'active',
failedCount: 0,
lastRanAt: new Date('2024-01-01T00:00:00.000Z'),
lastFailedAt: null,
nextRunAt: new Date('2024-01-02T00:00:00.000Z'),
}
beforeEach(() => {
vi.resetModules()
vi.doMock('@/lib/logs/console-logger', () => ({
createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }),
}))
vi.doMock('crypto', () => ({
randomUUID: vi.fn(() => 'test-uuid'),
default: { randomUUID: vi.fn(() => 'test-uuid') },
}))
})
afterEach(() => {
vi.clearAllMocks()
})
describe('Schedule Status API Route', () => {
it('returns schedule status successfully', async () => {
mockScheduleStatusDb({}) // default mocks
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue({ user: { id: 'user-id' } }),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'schedule-id' }) })
expect(res.status).toBe(200)
const data = await res.json()
expect(data).toMatchObject({
status: 'active',
failedCount: 0,
nextRunAt: mockSchedule.nextRunAt.toISOString(),
isDisabled: false,
})
})
it('marks disabled schedules with isDisabled = true', async () => {
mockScheduleStatusDb({ schedule: [{ ...mockSchedule, status: 'disabled' }] })
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue({ user: { id: 'user-id' } }),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'schedule-id' }) })
expect(res.status).toBe(200)
const data = await res.json()
expect(data).toHaveProperty('status', 'disabled')
expect(data).toHaveProperty('isDisabled', true)
expect(data).toHaveProperty('lastFailedAt')
})
it('returns 404 if schedule not found', async () => {
mockScheduleStatusDb({ schedule: [] })
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue({ user: { id: 'user-id' } }),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'missing-id' }) })
expect(res.status).toBe(404)
const data = await res.json()
expect(data).toHaveProperty('error', 'Schedule not found')
})
it('returns 404 if related workflow not found', async () => {
mockScheduleStatusDb({ workflow: [] })
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue({ user: { id: 'user-id' } }),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'schedule-id' }) })
expect(res.status).toBe(404)
const data = await res.json()
expect(data).toHaveProperty('error', 'Workflow not found')
})
it('returns 403 when user is not owner of workflow', async () => {
mockScheduleStatusDb({ workflow: [{ userId: 'another-user' }] })
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue({ user: { id: 'user-id' } }),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'schedule-id' }) })
expect(res.status).toBe(403)
const data = await res.json()
expect(data).toHaveProperty('error', 'Not authorized to view this schedule')
})
it('returns 401 when user is not authenticated', async () => {
mockScheduleStatusDb({})
vi.doMock('@/lib/auth', () => ({
getSession: vi.fn().mockResolvedValue(null),
}))
const req = createMockRequest('GET')
const { GET } = await import('./route')
const res = await GET(req, { params: Promise.resolve({ id: 'schedule-id' }) })
expect(res.status).toBe(401)
const data = await res.json()
expect(data).toHaveProperty('error', 'Unauthorized')
})
})

View File

@@ -0,0 +1,69 @@
import { NextRequest, NextResponse } from 'next/server'
import { eq } from 'drizzle-orm'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { workflow, workflowSchedule } from '@/db/schema'
const logger = createLogger('ScheduleStatusAPI')
export async function GET(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
const scheduleId = id
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized schedule status request`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const [schedule] = await db
.select({
id: workflowSchedule.id,
workflowId: workflowSchedule.workflowId,
status: workflowSchedule.status,
failedCount: workflowSchedule.failedCount,
lastRanAt: workflowSchedule.lastRanAt,
lastFailedAt: workflowSchedule.lastFailedAt,
nextRunAt: workflowSchedule.nextRunAt,
})
.from(workflowSchedule)
.where(eq(workflowSchedule.id, scheduleId))
.limit(1)
if (!schedule) {
logger.warn(`[${requestId}] Schedule not found: ${scheduleId}`)
return NextResponse.json({ error: 'Schedule not found' }, { status: 404 })
}
const [workflowRecord] = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
if (!workflowRecord) {
logger.warn(`[${requestId}] Workflow not found for schedule: ${scheduleId}`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
if (workflowRecord.userId !== session.user.id) {
logger.warn(`[${requestId}] User not authorized to view this schedule: ${scheduleId}`)
return NextResponse.json({ error: 'Not authorized to view this schedule' }, { status: 403 })
}
return NextResponse.json({
status: schedule.status,
failedCount: schedule.failedCount,
lastRanAt: schedule.lastRanAt,
lastFailedAt: schedule.lastFailedAt,
nextRunAt: schedule.nextRunAt,
isDisabled: schedule.status === 'disabled',
})
} catch (error) {
logger.error(`[${requestId}] Error retrieving schedule status: ${scheduleId}`, error)
return NextResponse.json({ error: 'Failed to retrieve schedule status' }, { status: 500 })
}
}

View File

@@ -7,6 +7,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
createMockRequest,
mockExecutionDependencies,
mockScheduleExecuteDb,
sampleWorkflowState,
} from '@/app/api/__test-utils__/utils'
@@ -14,17 +15,14 @@ describe('Scheduled Workflow Execution API Route', () => {
beforeEach(() => {
vi.resetModules()
// Mock all dependencies
mockExecutionDependencies()
// Mock the Cron library
vi.doMock('croner', () => ({
Cron: vi.fn().mockImplementation(() => ({
nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute
})),
}))
// Create mock database with test schedules
vi.doMock('@/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
@@ -94,11 +92,7 @@ describe('Scheduled Workflow Execution API Route', () => {
vi.clearAllMocks()
})
/**
* Test the basic flow of checking and executing scheduled workflows
*/
it('should execute scheduled workflows successfully', async () => {
// Create executor mock to track calls
const executeMock = vi.fn().mockResolvedValue({
success: true,
output: { response: 'Scheduled execution completed' },
@@ -116,67 +110,41 @@ describe('Scheduled Workflow Execution API Route', () => {
})),
}))
// Create a mock request
const req = createMockRequest('GET')
// Import the route handler after mocks are set up
const { GET } = await import('./route')
// Call the handler
const response = await GET(req)
// Verify the response exists
expect(response).toBeDefined()
// Validate that the API responds with a structured response
const data = await response.json()
expect(data).toHaveProperty('message')
expect(data).toHaveProperty('executedCount')
})
/**
* Test error handling during execution of scheduled workflows
*/
it('should handle errors during scheduled execution gracefully', async () => {
// Create a mock for persistent execution error
const persistExecutionErrorMock = vi.fn().mockResolvedValue(undefined)
// Mock the logger
vi.doMock('@/lib/logs/execution-logger', () => ({
persistExecutionLogs: vi.fn().mockResolvedValue(undefined),
persistExecutionError: persistExecutionErrorMock,
}))
// Mock the executor to throw an error
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: vi.fn().mockRejectedValue(new Error('Execution failed')),
})),
}))
// Create a mock request
const req = createMockRequest('GET')
// Import the route handler after mocks are set up
const { GET } = await import('./route')
// Call the handler
const response = await GET(req)
// Verify response exists
expect(response).toBeDefined()
// Validate that errors during execution don't crash the API
// It should still return a valid response
const data = await response.json()
expect(data).toHaveProperty('message')
})
/**
* Test behavior when no schedules are due for execution
*/
it('should handle case with no due schedules', async () => {
// Mock empty schedules list
vi.doMock('@/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
@@ -196,25 +164,13 @@ describe('Scheduled Workflow Execution API Route', () => {
return { db: mockDb }
})
// Create a mock request
const req = createMockRequest('GET')
// Import the route handler after mocks are set up
const { GET } = await import('./route')
// Call the handler
const response = await GET(req)
// Check response
expect(response.status).toBe(200)
// Parse the response body
const data = await response.json()
// Should report zero executed workflows
expect(data).toHaveProperty('executedCount', 0)
// Create executor mock to verify it wasn't called
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
@@ -222,15 +178,10 @@ describe('Scheduled Workflow Execution API Route', () => {
})),
}))
// Verify executor wasn't called since there were no schedules
expect(executeMock).not.toHaveBeenCalled()
})
/**
* Test handling of database-level errors
*/
it('should handle scheduler-level errors gracefully', async () => {
// Mock the database to throw an error
vi.doMock('@/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => {
@@ -242,22 +193,78 @@ describe('Scheduled Workflow Execution API Route', () => {
return { db: mockDb }
})
// Create a mock request
const req = createMockRequest('GET')
// Import the route handler after mocks are set up
const { GET } = await import('./route')
// Call the handler
const response = await GET(req)
// Check response - should be an error
expect(response.status).toBe(500)
// Parse the response body
const data = await response.json()
// Should contain error information
expect(data).toHaveProperty('error', 'Database error')
})
it('should execute schedules that are explicitly marked as active', async () => {
const executeMock = vi.fn().mockResolvedValue({ success: true, metadata: {} })
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-active',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'active',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
})
const req = createMockRequest('GET')
const { GET } = await import('./route')
const response = await GET(req)
expect(response.status).toBe(200)
})
it('should not execute schedules that are disabled', async () => {
const executeMock = vi.fn()
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
mockScheduleExecuteDb({
schedules: [
{
id: 'schedule-disabled',
workflowId: 'workflow-id',
userId: 'user-id',
status: 'disabled',
nextRunAt: new Date(Date.now() - 60_000),
lastRanAt: null,
cronExpression: null,
failedCount: 0,
},
],
})
const req = createMockRequest('GET')
const { GET } = await import('./route')
const response = await GET(req)
expect(response.status).toBe(200)
const data = await response.json()
expect(data).toHaveProperty('executedCount', 0)
expect(executeMock).not.toHaveBeenCalled()
})
})

View File

@@ -1,6 +1,6 @@
import { NextRequest, NextResponse } from 'next/server'
import { Cron } from 'croner'
import { eq, lte } from 'drizzle-orm'
import { and, eq, lte, not } from 'drizzle-orm'
import { sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
@@ -28,6 +28,9 @@ export const dynamic = 'force-dynamic'
const logger = createLogger('ScheduledExecuteAPI')
// Maximum number of consecutive failures before disabling a schedule
const MAX_CONSECUTIVE_FAILURES = 3
/**
* Calculate the next run time for a schedule
* This is a wrapper around the utility function in schedule-utils.ts
@@ -36,17 +39,11 @@ function calculateNextRunTime(
schedule: typeof workflowSchedule.$inferSelect,
blocks: Record<string, BlockState>
): Date {
// Find the starter block
const starterBlock = Object.values(blocks).find((block) => block.type === 'starter')
if (!starterBlock) throw new Error('No starter block found')
// Get schedule type from the starter block
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
// Get all schedule values
const scheduleValues = getScheduleTimeValues(starterBlock)
// If there's a cron expression, use croner to calculate next run
if (schedule.cronExpression) {
const cron = new Cron(schedule.cronExpression)
const nextDate = cron.nextRun()
@@ -54,19 +51,14 @@ function calculateNextRunTime(
return nextDate
}
// Calculate next run time with our helper function
// We pass the lastRanAt from the schedule to help calculate accurate next run time
const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null
return calculateNextTime(scheduleType, scheduleValues, lastRanAt)
}
// Define the schema for environment variables
const EnvVarsSchema = z.record(z.string())
// Keep track of running executions to prevent overlap
const runningExecutions = new Set<string>()
// Add GET handler for cron job
export async function GET(req: NextRequest) {
logger.info(`Scheduled execution triggered at ${new Date().toISOString()}`)
const requestId = crypto.randomUUID().slice(0, 8)
@@ -75,13 +67,20 @@ export async function GET(req: NextRequest) {
let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = []
try {
// Query schedules due for execution
dueSchedules = await db
.select()
.from(workflowSchedule)
.where(lte(workflowSchedule.nextRunAt, now))
// Limit to 10 workflows per minute to prevent overload
.limit(10)
try {
dueSchedules = await db
.select()
.from(workflowSchedule)
.where(
and(lte(workflowSchedule.nextRunAt, now), not(eq(workflowSchedule.status, 'disabled')))
)
.limit(10)
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
} catch (queryError) {
logger.error(`[${requestId}] Error in schedule query:`, queryError)
throw queryError
}
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
@@ -89,7 +88,6 @@ export async function GET(req: NextRequest) {
const executionId = uuidv4()
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
logger.debug(`[${requestId}] Skipping workflow ${schedule.workflowId} - already running`)
continue
@@ -98,7 +96,6 @@ export async function GET(req: NextRequest) {
runningExecutions.add(schedule.workflowId)
logger.debug(`[${requestId}] Starting execution of workflow ${schedule.workflowId}`)
// Retrieve the workflow record
const [workflowRecord] = await db
.select()
.from(workflow)
@@ -111,7 +108,6 @@ export async function GET(req: NextRequest) {
continue
}
// Check if the user has exceeded their usage limits
const usageCheck = await checkServerSideUsageLimits(workflowRecord.userId)
if (usageCheck.isExceeded) {
logger.warn(
@@ -123,7 +119,6 @@ export async function GET(req: NextRequest) {
}
)
// Log an execution error for the user to see why their schedule was skipped
await persistExecutionError(
schedule.workflowId,
executionId,
@@ -134,30 +129,32 @@ export async function GET(req: NextRequest) {
'schedule'
)
// Update the next run time to avoid constant retries
const retryDelay = 24 * 60 * 60 * 1000 // 24 hour delay for exceeded limits
const nextRetryAt = new Date(now.getTime() + retryDelay)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated next retry time due to usage limits`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for usage limits:`, updateError)
}
runningExecutions.delete(schedule.workflowId)
continue
}
// The state in the database is exactly what we store in localStorage
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops } = state
// Use the same execution flow as in use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
// Retrieve environment variables for this user
const [userEnv] = await db
.select()
.from(environment)
@@ -171,10 +168,8 @@ export async function GET(req: NextRequest) {
throw new Error('No environment variables found for this user')
}
// Parse and validate environment variables
const variables = EnvVarsSchema.parse(userEnv.variables)
// Replace environment variables in the block states
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
@@ -183,13 +178,11 @@ export async function GET(req: NextRequest) {
const subAcc = await subAccPromise
let value = subBlock.value
// If the value is a string and contains environment variable syntax
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
// Process all matches sequentially
for (const match of matches) {
const varName = match.slice(2, -2) // Remove {{ and }}
const varName = match.slice(2, -2)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
@@ -221,7 +214,6 @@ export async function GET(req: NextRequest) {
Promise.resolve({} as Record<string, Record<string, any>>)
)
// Create a map of decrypted environment variables
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
@@ -233,27 +225,20 @@ export async function GET(req: NextRequest) {
}
}
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
// Add workflowId to the input for OAuth credential resolution
const input = {
workflowId: schedule.workflowId,
// Add _context with workflowId to ensure OAuth token resolution works properly
_context: {
workflowId: schedule.workflowId,
},
}
// Process the block states to ensure response formats are properly parsed
// This is crucial for agent blocks with response format
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
// Check if this block has a responseFormat that needs to be parsed
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
// Attempt to parse the responseFormat if it's a string
const parsedResponseFormat = JSON.parse(blockState.responseFormat)
acc[blockId] = {
@@ -277,15 +262,12 @@ export async function GET(req: NextRequest) {
logger.info(`[${requestId}] Executing workflow ${schedule.workflowId}`)
// Get workflow variables
let workflowVariables = {}
if (workflowRecord.variables) {
try {
// Parse workflow variables if they're stored as a string
if (typeof workflowRecord.variables === 'string') {
workflowVariables = JSON.parse(workflowRecord.variables)
} else {
// Otherwise use as is (already parsed JSON)
workflowVariables = workflowRecord.variables
}
logger.debug(
@@ -296,7 +278,6 @@ export async function GET(req: NextRequest) {
`[${requestId}] Failed to parse workflow variables: ${schedule.workflowId}`,
error
)
// Continue execution even if variables can't be parsed
}
} else {
logger.debug(`[${requestId}] No workflow variables found for: ${schedule.workflowId}`)
@@ -304,15 +285,13 @@ export async function GET(req: NextRequest) {
const executor = new Executor(
serializedWorkflow,
processedBlockStates, // Use the processed block states
processedBlockStates,
decryptedEnvVars,
input,
workflowVariables
)
const result = await executor.execute(schedule.workflowId)
// Check if we got a StreamingExecution result (with stream + execution properties)
// For scheduled executions, we only care about the ExecutionResult part, not the stream
const executionResult =
'stream' in result && 'execution' in result ? result.execution : result
@@ -321,69 +300,89 @@ export async function GET(req: NextRequest) {
executionTime: executionResult.metadata?.duration,
})
// Update workflow run counts if execution was successful
if (executionResult.success) {
await updateWorkflowRunCounts(schedule.workflowId)
// Track scheduled execution in user stats
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, workflowRecord.userId))
try {
await db
.update(userStats)
.set({
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, workflowRecord.userId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
logger.error(`[${requestId}] Error updating user stats:`, statsError)
}
}
// Build trace spans from execution logs
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
// Add trace spans to the execution result
const enrichedResult = {
...executionResult,
traceSpans,
totalDuration,
}
// Log each execution step and the final result
await persistExecutionLogs(schedule.workflowId, executionId, enrichedResult, 'schedule')
// Only update next_run_at if execution was successful
if (executionResult.success) {
logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`)
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
logger.debug(
`[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${schedule.workflowId}`
)
// Update the schedule with the next run time
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
try {
await db
.update(workflowSchedule)
.set({
lastRanAt: now,
updatedAt: now,
nextRunAt,
failedCount: 0, // Reset failure count on success
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(
`[${requestId}] Updated next run time for workflow ${schedule.workflowId} to ${nextRunAt.toISOString()}`
)
logger.debug(
`[${requestId}] Updated next run time for workflow ${schedule.workflowId} to ${nextRunAt.toISOString()}`
)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after success:`, updateError)
}
} else {
logger.warn(`[${requestId}] Workflow ${schedule.workflowId} execution failed`)
// If execution failed, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay
const nextRetryAt = new Date(now.getTime() + retryDelay)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
const nextRunAt = calculateNextRunTime(schedule, blocks)
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after failure`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after failure:`, updateError)
}
}
} catch (error: any) {
logger.error(
@@ -391,20 +390,56 @@ export async function GET(req: NextRequest) {
error
)
// Log the error
await persistExecutionError(schedule.workflowId, executionId, error, 'schedule')
// On error, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay
const nextRetryAt = new Date(now.getTime() + retryDelay)
let nextRunAt: Date
try {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, schedule.workflowId))
.limit(1)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
if (workflowRecord) {
const state = workflowRecord.state as WorkflowState
const { blocks } = state
nextRunAt = calculateNextRunTime(schedule, blocks)
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
} catch (workflowError) {
logger.error(
`[${requestId}] Error retrieving workflow for next run calculation`,
workflowError
)
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) // 24 hours as a fallback
}
const newFailedCount = (schedule.failedCount || 0) + 1
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
if (shouldDisable) {
logger.warn(
`[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
)
}
try {
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt,
failedCount: newFailedCount,
lastFailedAt: now,
status: shouldDisable ? 'disabled' : 'active',
})
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated schedule after execution error`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule after execution error:`, updateError)
}
} finally {
runningExecutions.delete(schedule.workflowId)
}

View File

@@ -97,6 +97,9 @@ describe('Schedule Configuration API Route', () => {
// Fix imports for route.ts
vi.doMock('crypto', () => ({
randomUUID: vi.fn(() => 'test-uuid'),
default: {
randomUUID: vi.fn(() => 'test-uuid'),
},
}))
// Mock the schedule utils

View File

@@ -1,17 +1,63 @@
import { NextRequest, NextResponse } from 'next/server'
import crypto from 'crypto'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import {
BlockState,
calculateNextRunTime,
generateCronExpression,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { db } from '@/db'
import { workflowSchedule } from '@/db/schema'
const logger = createLogger('ScheduledAPI')
const ScheduleRequestSchema = z.object({
workflowId: z.string(),
state: z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()),
}),
})
// Track recent requests to reduce redundant logging
const recentRequests = new Map<string, number>()
const LOGGING_THROTTLE_MS = 5000 // 5 seconds between logging for the same workflow
function hasValidScheduleConfig(
scheduleType: string | undefined,
scheduleValues: ReturnType<typeof getScheduleTimeValues>,
starterBlock: BlockState
): boolean {
switch (scheduleType) {
case 'minutes':
return !!scheduleValues.minutesInterval
case 'hourly':
return scheduleValues.hourlyMinute !== undefined
case 'daily':
return !!scheduleValues.dailyTime[0] || !!scheduleValues.dailyTime[1]
case 'weekly':
return (
!!scheduleValues.weeklyDay &&
(!!scheduleValues.weeklyTime[0] || !!scheduleValues.weeklyTime[1])
)
case 'monthly':
return (
!!scheduleValues.monthlyDay &&
(!!scheduleValues.monthlyTime[0] || !!scheduleValues.monthlyTime[1])
)
case 'custom':
return !!getSubBlockValue(starterBlock, 'cronExpression')
default:
return false
}
}
/**
* Get schedule information for a workflow
*/
@@ -21,7 +67,6 @@ export async function GET(req: NextRequest) {
const workflowId = url.searchParams.get('workflowId')
const mode = url.searchParams.get('mode')
// Skip processing if mode is provided and not 'schedule'
if (mode && mode !== 'schedule') {
return NextResponse.json({ schedule: null })
}
@@ -37,7 +82,6 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ error: 'Missing workflowId parameter' }, { status: 400 })
}
// Check if we should log this request (throttle logging for repeat requests)
const now = Date.now()
const lastLog = recentRequests.get(workflowId) || 0
const shouldLog = now - lastLog > LOGGING_THROTTLE_MS
@@ -47,14 +91,12 @@ export async function GET(req: NextRequest) {
recentRequests.set(workflowId, now)
}
// Find the schedule for this workflow
const schedule = await db
.select()
.from(workflowSchedule)
.where(eq(workflowSchedule.workflowId, workflowId))
.limit(1)
// Set cache control headers to reduce repeated API calls
const headers = new Headers()
headers.set('Cache-Control', 'max-age=30') // Cache for 30 seconds
@@ -62,9 +104,153 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ schedule: null }, { headers })
}
return NextResponse.json({ schedule: schedule[0] }, { headers })
const scheduleData = schedule[0]
const isDisabled = scheduleData.status === 'disabled'
const hasFailures = scheduleData.failedCount > 0
return NextResponse.json(
{
schedule: scheduleData,
isDisabled,
hasFailures,
canBeReactivated: isDisabled,
},
{ headers }
)
} catch (error) {
logger.error(`[${requestId}] Error retrieving workflow schedule`, error)
return NextResponse.json({ error: 'Failed to retrieve workflow schedule' }, { status: 500 })
}
}
/**
* Create or update a schedule for a workflow
*/
export async function POST(req: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized schedule update attempt`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = await req.json()
const { workflowId, state } = ScheduleRequestSchema.parse(body)
logger.info(`[${requestId}] Processing schedule update for workflow ${workflowId}`)
const starterBlock = Object.values(state.blocks).find(
(block: any) => block.type === 'starter'
) as BlockState | undefined
if (!starterBlock) {
logger.warn(`[${requestId}] No starter block found in workflow ${workflowId}`)
return NextResponse.json({ error: 'No starter block found in workflow' }, { status: 400 })
}
const startWorkflow = getSubBlockValue(starterBlock, 'startWorkflow')
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
const scheduleValues = getScheduleTimeValues(starterBlock)
const hasScheduleConfig = hasValidScheduleConfig(scheduleType, scheduleValues, starterBlock)
if (startWorkflow !== 'schedule' && !hasScheduleConfig) {
logger.info(
`[${requestId}] Removing schedule for workflow ${workflowId} - no valid configuration found`
)
await db.delete(workflowSchedule).where(eq(workflowSchedule.workflowId, workflowId))
return NextResponse.json({ message: 'Schedule removed' })
}
if (startWorkflow !== 'schedule') {
logger.info(
`[${requestId}] Setting workflow to scheduled mode based on schedule configuration`
)
}
logger.debug(`[${requestId}] Schedule type for workflow ${workflowId}: ${scheduleType}`)
let cronExpression: string | null = null
let nextRunAt: Date | undefined
const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC'
try {
const defaultScheduleType = scheduleType || 'daily'
const scheduleStartAt = getSubBlockValue(starterBlock, 'scheduleStartAt')
const scheduleTime = getSubBlockValue(starterBlock, 'scheduleTime')
logger.debug(`[${requestId}] Schedule configuration:`, {
type: defaultScheduleType,
timezone,
startDate: scheduleStartAt || 'not specified',
time: scheduleTime || 'not specified',
})
cronExpression = generateCronExpression(defaultScheduleType, scheduleValues)
nextRunAt = calculateNextRunTime(defaultScheduleType, scheduleValues)
logger.debug(
`[${requestId}] Generated cron: ${cronExpression}, next run at: ${nextRunAt.toISOString()}`
)
} catch (error) {
logger.error(`[${requestId}] Error generating schedule: ${error}`)
return NextResponse.json({ error: 'Failed to generate schedule' }, { status: 400 })
}
const values = {
id: crypto.randomUUID(),
workflowId,
cronExpression,
triggerType: 'schedule',
createdAt: new Date(),
updatedAt: new Date(),
nextRunAt,
timezone,
status: 'active', // Ensure new schedules are active
failedCount: 0, // Reset failure count for new schedules
}
const setValues = {
cronExpression,
updatedAt: new Date(),
nextRunAt,
timezone,
status: 'active', // Reactivate if previously disabled
failedCount: 0, // Reset failure count on reconfiguration
}
await db
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [workflowSchedule.workflowId],
set: setValues,
})
logger.info(`[${requestId}] Schedule updated for workflow ${workflowId}`, {
nextRunAt: nextRunAt?.toISOString(),
cronExpression,
})
return NextResponse.json({
message: 'Schedule updated',
nextRunAt,
cronExpression,
})
} catch (error) {
logger.error(`[${requestId}] Error updating workflow schedule`, error)
if (error instanceof z.ZodError) {
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}
return NextResponse.json({ error: 'Failed to update workflow schedule' }, { status: 500 })
}
}

View File

@@ -1,203 +0,0 @@
import { NextRequest, NextResponse } from 'next/server'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import {
BlockState,
calculateNextRunTime,
generateCronExpression,
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/schedules/utils'
import { db } from '@/db'
import { workflowSchedule } from '@/db/schema'
const logger = createLogger('ScheduledScheduleAPI')
// Schema for schedule request
const ScheduleRequestSchema = z.object({
workflowId: z.string(),
state: z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()),
}),
})
export async function POST(req: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized schedule update attempt`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const body = await req.json()
const { workflowId, state } = ScheduleRequestSchema.parse(body)
logger.info(`[${requestId}] Processing schedule update for workflow ${workflowId}`)
// Find the starter block to check if it's configured for scheduling
const starterBlock = Object.values(state.blocks).find(
(block: any) => block.type === 'starter'
) as BlockState | undefined
if (!starterBlock) {
logger.warn(`[${requestId}] No starter block found in workflow ${workflowId}`)
return NextResponse.json({ error: 'No starter block found in workflow' }, { status: 400 })
}
const startWorkflow = getSubBlockValue(starterBlock, 'startWorkflow')
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType')
// Check if there's a valid schedule configuration using the helper function
const scheduleValues = getScheduleTimeValues(starterBlock)
// Determine if there's a valid schedule configuration
const hasScheduleConfig = (() => {
switch (scheduleType) {
case 'minutes':
return !!scheduleValues.minutesInterval
case 'hourly':
return scheduleValues.hourlyMinute !== undefined
case 'daily':
return !!scheduleValues.dailyTime[0] || !!scheduleValues.dailyTime[1]
case 'weekly':
return (
!!scheduleValues.weeklyDay &&
(!!scheduleValues.weeklyTime[0] || !!scheduleValues.weeklyTime[1])
)
case 'monthly':
return (
!!scheduleValues.monthlyDay &&
(!!scheduleValues.monthlyTime[0] || !!scheduleValues.monthlyTime[1])
)
case 'custom':
return !!getSubBlockValue(starterBlock, 'cronExpression')
default:
return false
}
})()
// If the workflow is not configured for scheduling, delete any existing schedule
if (startWorkflow !== 'schedule' && !hasScheduleConfig) {
logger.info(
`[${requestId}] Removing schedule for workflow ${workflowId} - no valid configuration found`
)
await db.delete(workflowSchedule).where(eq(workflowSchedule.workflowId, workflowId))
return NextResponse.json({ message: 'Schedule removed' })
}
// If we're here, we either have startWorkflow === 'schedule' or hasScheduleConfig is true
if (startWorkflow !== 'schedule') {
logger.info(
`[${requestId}] Setting workflow to scheduled mode based on schedule configuration`
)
// The UI should handle this, but as a fallback we'll assume the user intended to schedule
// the workflow even if startWorkflow wasn't set properly
}
// Get schedule configuration from starter block
logger.debug(`[${requestId}] Schedule type for workflow ${workflowId}: ${scheduleType}`)
// First check if there's an existing schedule
const existingSchedule = await db
.select()
.from(workflowSchedule)
.where(eq(workflowSchedule.workflowId, workflowId))
.limit(1)
// Generate cron expression and calculate next run time
let cronExpression: string | null = null
let nextRunAt: Date | undefined
const timezone = getSubBlockValue(starterBlock, 'timezone') || 'UTC'
try {
// Get schedule type from starter block
const scheduleType = getSubBlockValue(starterBlock, 'scheduleType') || 'daily'
// Get schedule values
const scheduleValues = getScheduleTimeValues(starterBlock)
// Get the schedule start date
const scheduleStartAt = getSubBlockValue(starterBlock, 'scheduleStartAt')
// Get the schedule time
const scheduleTime = getSubBlockValue(starterBlock, 'scheduleTime')
logger.debug(`[${requestId}] Schedule configuration:`, {
type: scheduleType,
timezone,
startDate: scheduleStartAt || 'not specified',
time: scheduleTime || 'not specified',
})
// Get cron expression based on schedule type
cronExpression = generateCronExpression(scheduleType, scheduleValues)
// Calculate next run time with timezone awareness
nextRunAt = calculateNextRunTime(scheduleType, scheduleValues)
logger.debug(
`[${requestId}] Generated cron: ${cronExpression}, next run at: ${nextRunAt.toISOString()}`
)
} catch (error) {
logger.error(`[${requestId}] Error generating schedule: ${error}`)
return NextResponse.json({ error: 'Failed to generate schedule' }, { status: 400 })
}
// Prepare the values for upsert
const values = {
id: crypto.randomUUID(),
workflowId,
cronExpression,
triggerType: 'schedule',
createdAt: new Date(),
updatedAt: new Date(),
nextRunAt,
timezone,
}
// Prepare the set values for update
const setValues = {
cronExpression,
updatedAt: new Date(),
nextRunAt,
timezone,
}
// Upsert the schedule
await db
.insert(workflowSchedule)
.values(values)
.onConflictDoUpdate({
target: [workflowSchedule.workflowId],
set: setValues,
})
logger.info(`[${requestId}] Schedule updated for workflow ${workflowId}`, {
nextRunAt: nextRunAt?.toISOString(),
cronExpression,
})
return NextResponse.json({
message: 'Schedule updated',
nextRunAt,
cronExpression,
})
} catch (error) {
logger.error(`[${requestId}] Error updating workflow schedule`, error)
if (error instanceof z.ZodError) {
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}
return NextResponse.json({ error: 'Failed to update workflow schedule' }, { status: 500 })
}
}

View File

@@ -169,7 +169,7 @@ export function ScheduleConfig({ blockId, subBlockId, isConnecting }: ScheduleCo
// 4. Make a direct API call instead of relying on sync
// This gives us more control and better error handling
logger.debug('Making direct API call to save schedule with complete state')
const response = await fetch(`/api/schedules/schedule`, {
const response = await fetch(`/api/schedules`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View File

@@ -37,7 +37,11 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
nextRunAt: string | null
lastRanAt: string | null
timezone: string
status?: string
isDisabled?: boolean
id?: string
} | null>(null)
const [isLoadingScheduleInfo, setIsLoadingScheduleInfo] = useState(false)
const [webhookInfo, setWebhookInfo] = useState<{
webhookPath: string
provider: string
@@ -56,7 +60,6 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
)
const isWide = useWorkflowStore((state) => state.blocks[id]?.isWide ?? false)
const blockHeight = useWorkflowStore((state) => state.blocks[id]?.height ?? 0)
const hasActiveSchedule = useWorkflowStore((state) => state.hasActiveSchedule ?? false)
const hasActiveWebhook = useWorkflowStore((state) => state.hasActiveWebhook ?? false)
// Workflow store actions
@@ -68,49 +71,106 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
const isActiveBlock = useExecutionStore((state) => state.activeBlockIds.has(id))
const isActive = dataIsActive || isActiveBlock
// Get schedule information for the tooltip
useEffect(() => {
if (type === 'starter' && hasActiveSchedule) {
const fetchScheduleInfo = async () => {
try {
const workflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!workflowId) return
const reactivateSchedule = async (scheduleId: string) => {
try {
const response = await fetch(`/api/schedules/${scheduleId}`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ action: 'reactivate' }),
})
const response = await fetch(`/api/schedules?workflowId=${workflowId}&mode=schedule`, {
cache: 'no-store',
headers: {
'Cache-Control': 'no-cache',
},
})
if (response.ok) {
fetchScheduleInfo()
} else {
console.error('Failed to reactivate schedule')
}
} catch (error) {
console.error('Error reactivating schedule:', error)
}
}
if (response.ok) {
const data = await response.json()
if (data.schedule) {
let scheduleTiming = 'Unknown schedule'
if (data.schedule.cronExpression) {
scheduleTiming = parseCronToHumanReadable(data.schedule.cronExpression)
}
const fetchScheduleInfo = async () => {
try {
setIsLoadingScheduleInfo(true)
const workflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!workflowId) return
setScheduleInfo({
scheduleTiming,
nextRunAt: data.schedule.nextRunAt,
lastRanAt: data.schedule.lastRanAt,
timezone: data.schedule.timezone || 'UTC',
})
}
}
} catch (error) {
console.error('Error fetching schedule info:', error)
}
const response = await fetch(`/api/schedules?workflowId=${workflowId}&mode=schedule`, {
cache: 'no-store',
headers: {
'Cache-Control': 'no-cache',
},
})
if (!response.ok) {
setScheduleInfo(null)
return
}
const data = await response.json()
if (!data.schedule) {
setScheduleInfo(null)
return
}
let scheduleTiming = 'Unknown schedule'
if (data.schedule.cronExpression) {
scheduleTiming = parseCronToHumanReadable(data.schedule.cronExpression)
}
const baseInfo = {
scheduleTiming,
nextRunAt: data.schedule.nextRunAt as string | null,
lastRanAt: data.schedule.lastRanAt as string | null,
timezone: data.schedule.timezone || 'UTC',
status: data.schedule.status as string,
isDisabled: data.schedule.status === 'disabled',
id: data.schedule.id as string,
}
try {
const statusRes = await fetch(`/api/schedules/${baseInfo.id}/status`, {
cache: 'no-store',
headers: { 'Cache-Control': 'no-cache' },
})
if (statusRes.ok) {
const statusData = await statusRes.json()
setScheduleInfo({
scheduleTiming: baseInfo.scheduleTiming,
nextRunAt: statusData.nextRunAt ?? baseInfo.nextRunAt,
lastRanAt: statusData.lastRanAt ?? baseInfo.lastRanAt,
timezone: baseInfo.timezone,
status: statusData.status ?? baseInfo.status,
isDisabled: statusData.isDisabled ?? baseInfo.isDisabled,
id: baseInfo.id,
})
return
}
} catch (err) {
console.error('Error fetching schedule status:', err)
}
setScheduleInfo(baseInfo)
} catch (error) {
console.error('Error fetching schedule info:', error)
setScheduleInfo(null)
} finally {
setIsLoadingScheduleInfo(false)
}
}
useEffect(() => {
if (type === 'starter') {
fetchScheduleInfo()
} else if (!hasActiveSchedule) {
} else {
setScheduleInfo(null)
}
}, [type, hasActiveSchedule])
}, [type])
// Get webhook information for the tooltip
useEffect(() => {
if (type === 'starter' && hasActiveWebhook) {
const fetchWebhookInfo = async () => {
@@ -140,12 +200,10 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
}
}, [type, hasActiveWebhook])
// Update node internals when handles change
useEffect(() => {
updateNodeInternals(id)
}, [id, horizontalHandles, updateNodeInternals])
// Add debounce helper
const debounce = (func: Function, wait: number) => {
let timeout: NodeJS.Timeout
return (...args: any[]) => {
@@ -154,7 +212,6 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
}
}
// Add effect to observe size changes with debounced updates
useEffect(() => {
if (!contentRef.current) return
@@ -167,12 +224,10 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
}, 100)
const resizeObserver = new ResizeObserver((entries) => {
// Cancel any pending animation frame
if (rafId) {
cancelAnimationFrame(rafId)
}
// Schedule the update on the next animation frame
rafId = requestAnimationFrame(() => {
for (const entry of entries) {
const height =
@@ -286,9 +341,8 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
}
}
// Check if this is a starter block and has active schedule or webhook
// Check if this is a starter block and if we need to show schedule / webhook indicators
const isStarterBlock = type === 'starter'
const showScheduleIndicator = isStarterBlock && hasActiveSchedule
const showWebhookIndicator = isStarterBlock && hasActiveWebhook
const getProviderName = (providerId: string): string => {
@@ -305,6 +359,8 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
return providers[providerId] || 'Webhook'
}
const shouldShowScheduleBadge = isStarterBlock && !isLoadingScheduleInfo && scheduleInfo !== null
return (
<div className="relative group">
<Card
@@ -400,25 +456,51 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
</Badge>
)}
{/* Schedule indicator badge - displayed for starter blocks with active schedules */}
{showScheduleIndicator && (
{shouldShowScheduleBadge && (
<Tooltip>
<TooltipTrigger asChild>
<Badge
variant="outline"
className="flex items-center gap-1 text-green-600 bg-green-50 border-green-200 hover:bg-green-50 dark:bg-green-900/20 dark:text-green-400 font-normal text-xs"
className={cn(
'flex items-center gap-1 font-normal text-xs',
scheduleInfo?.isDisabled
? 'text-amber-600 bg-amber-50 border-amber-200 hover:bg-amber-100 dark:bg-amber-900/20 dark:text-amber-400 cursor-pointer'
: 'text-green-600 bg-green-50 border-green-200 hover:bg-green-50 dark:bg-green-900/20 dark:text-green-400'
)}
onClick={
scheduleInfo?.isDisabled && scheduleInfo?.id
? () => reactivateSchedule(scheduleInfo.id!)
: undefined
}
>
<div className="relative flex items-center justify-center mr-0.5">
<div className="absolute h-3 w-3 rounded-full bg-green-500/20"></div>
<div className="relative h-2 w-2 rounded-full bg-green-500"></div>
<div
className={cn(
'absolute h-3 w-3 rounded-full',
scheduleInfo?.isDisabled ? 'bg-amber-500/20' : 'bg-green-500/20'
)}
></div>
<div
className={cn(
'relative h-2 w-2 rounded-full',
scheduleInfo?.isDisabled ? 'bg-amber-500' : 'bg-green-500'
)}
></div>
</div>
Scheduled
{scheduleInfo?.isDisabled ? 'Disabled' : 'Scheduled'}
</Badge>
</TooltipTrigger>
<TooltipContent side="top" className="max-w-[300px] p-4">
{scheduleInfo ? (
<>
<p className="text-sm">{scheduleInfo.scheduleTiming}</p>
{scheduleInfo.nextRunAt && (
{scheduleInfo.isDisabled && (
<p className="text-sm text-amber-600 font-medium mt-1">
This schedule is currently disabled due to consecutive failures. Click the
badge to reactivate it.
</p>
)}
{scheduleInfo.nextRunAt && !scheduleInfo.isDisabled && (
<p className="text-xs text-muted-foreground mt-1">
Next run:{' '}
{formatDateTime(new Date(scheduleInfo.nextRunAt), scheduleInfo.timezone)}

View File

@@ -0,0 +1,3 @@
ALTER TABLE "workflow_schedule" ADD COLUMN "failed_count" integer DEFAULT 0 NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_schedule" ADD COLUMN "status" text DEFAULT 'active' NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_schedule" ADD COLUMN "last_failed_at" timestamp;

File diff suppressed because it is too large Load Diff

View File

@@ -260,6 +260,13 @@
"when": 1747265680027,
"tag": "0036_married_skreet",
"breakpoints": true
},
{
"idx": 37,
"version": "7",
"when": 1747460441992,
"tag": "0037_outgoing_madame_hydra",
"breakpoints": true
}
]
}

View File

@@ -156,6 +156,9 @@ export const workflowSchedule = pgTable('workflow_schedule', {
lastRanAt: timestamp('last_ran_at'),
triggerType: text('trigger_type').notNull(), // "manual", "webhook", "schedule"
timezone: text('timezone').notNull().default('UTC'),
failedCount: integer('failed_count').notNull().default(0), // Track consecutive failures
status: text('status').notNull().default('active'), // 'active' or 'disabled'
lastFailedAt: timestamp('last_failed_at'), // When the schedule last failed
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})

View File

@@ -7,7 +7,7 @@ export const STORAGE_KEYS = {
export const API_ENDPOINTS = {
SYNC: '/api/workflows/sync',
ENVIRONMENT: '/api/environment',
SCHEDULE: '/api/schedules/schedule',
SCHEDULE: '/api/schedules',
SETTINGS: '/api/settings',
WORKFLOWS: '/api/workflows',
}