mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(schedules-perms): use regular perm system to view/edit schedule info (#901)
* fix(schedules-perms): use regular perm system to view schedule info * fix perms * improve logging
This commit is contained in:
committed by
GitHub
parent
d264a6ade8
commit
fd3ca87c38
@@ -3,6 +3,7 @@ import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getUserEntityPermissions } from '@/lib/permissions/utils'
|
||||
import { db } from '@/db'
|
||||
import { workflow, workflowSchedule } from '@/db/schema'
|
||||
|
||||
@@ -36,6 +37,7 @@ export async function DELETE(
|
||||
workflow: {
|
||||
id: workflow.id,
|
||||
userId: workflow.userId,
|
||||
workspaceId: workflow.workspaceId,
|
||||
},
|
||||
})
|
||||
.from(workflowSchedule)
|
||||
@@ -48,7 +50,22 @@ export async function DELETE(
|
||||
return NextResponse.json({ error: 'Schedule not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
if (schedules[0].workflow.userId !== session.user.id) {
|
||||
const workflowRecord = schedules[0].workflow
|
||||
|
||||
// Check authorization - either the user owns the workflow or has write/admin workspace permissions
|
||||
let isAuthorized = workflowRecord.userId === session.user.id
|
||||
|
||||
// If not authorized by ownership and the workflow belongs to a workspace, check workspace permissions
|
||||
if (!isAuthorized && workflowRecord.workspaceId) {
|
||||
const userPermission = await getUserEntityPermissions(
|
||||
session.user.id,
|
||||
'workspace',
|
||||
workflowRecord.workspaceId
|
||||
)
|
||||
isAuthorized = userPermission === 'write' || userPermission === 'admin'
|
||||
}
|
||||
|
||||
if (!isAuthorized) {
|
||||
logger.warn(`[${requestId}] Unauthorized schedule deletion attempt for schedule: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getUserEntityPermissions } from '@/lib/permissions/utils'
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
@@ -42,7 +43,7 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
const [workflowRecord] = await db
|
||||
.select({ userId: workflow.userId })
|
||||
.select({ userId: workflow.userId, workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, schedule.workflowId))
|
||||
.limit(1)
|
||||
@@ -52,7 +53,20 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
if (workflowRecord.userId !== session.user.id) {
|
||||
// Check authorization - either the user owns the workflow or has workspace permissions
|
||||
let isAuthorized = workflowRecord.userId === session.user.id
|
||||
|
||||
// If not authorized by ownership and the workflow belongs to a workspace, check workspace permissions
|
||||
if (!isAuthorized && workflowRecord.workspaceId) {
|
||||
const userPermission = await getUserEntityPermissions(
|
||||
session.user.id,
|
||||
'workspace',
|
||||
workflowRecord.workspaceId
|
||||
)
|
||||
isAuthorized = userPermission !== null
|
||||
}
|
||||
|
||||
if (!isAuthorized) {
|
||||
logger.warn(`[${requestId}] User not authorized to view this schedule: ${scheduleId}`)
|
||||
return NextResponse.json({ error: 'Not authorized to view this schedule' }, { status: 403 })
|
||||
}
|
||||
|
||||
@@ -209,248 +209,284 @@ export async function GET() {
|
||||
requestId
|
||||
)
|
||||
|
||||
// Load workflow data from normalized tables (no fallback to deprecated state column)
|
||||
logger.debug(
|
||||
`[${requestId}] Loading workflow ${schedule.workflowId} from normalized tables`
|
||||
)
|
||||
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)
|
||||
|
||||
if (!normalizedData) {
|
||||
logger.error(
|
||||
`[${requestId}] No normalized data found for scheduled workflow ${schedule.workflowId}`
|
||||
)
|
||||
throw new Error(
|
||||
`Workflow data not found in normalized tables for ${schedule.workflowId}`
|
||||
)
|
||||
}
|
||||
|
||||
// Use normalized data only
|
||||
const blocks = normalizedData.blocks
|
||||
const edges = normalizedData.edges
|
||||
const loops = normalizedData.loops
|
||||
const parallels = normalizedData.parallels
|
||||
logger.info(
|
||||
`[${requestId}] Loaded scheduled workflow ${schedule.workflowId} from normalized tables`
|
||||
)
|
||||
|
||||
const mergedStates = mergeSubblockState(blocks)
|
||||
|
||||
// Retrieve environment variables for this user (if any).
|
||||
const [userEnv] = await db
|
||||
.select()
|
||||
.from(environmentTable)
|
||||
.where(eq(environmentTable.userId, workflowRecord.userId))
|
||||
.limit(1)
|
||||
|
||||
if (!userEnv) {
|
||||
try {
|
||||
// Load workflow data from normalized tables (no fallback to deprecated state column)
|
||||
logger.debug(
|
||||
`[${requestId}] No environment record found for user ${workflowRecord.userId}. Proceeding with empty variables.`
|
||||
`[${requestId}] Loading workflow ${schedule.workflowId} from normalized tables`
|
||||
)
|
||||
}
|
||||
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)
|
||||
|
||||
const variables = EnvVarsSchema.parse(userEnv?.variables ?? {})
|
||||
if (!normalizedData) {
|
||||
logger.error(
|
||||
`[${requestId}] No normalized data found for scheduled workflow ${schedule.workflowId}`
|
||||
)
|
||||
throw new Error(
|
||||
`Workflow data not found in normalized tables for ${schedule.workflowId}`
|
||||
)
|
||||
}
|
||||
|
||||
const currentBlockStates = await Object.entries(mergedStates).reduce(
|
||||
async (accPromise, [id, block]) => {
|
||||
const acc = await accPromise
|
||||
acc[id] = await Object.entries(block.subBlocks).reduce(
|
||||
async (subAccPromise, [key, subBlock]) => {
|
||||
const subAcc = await subAccPromise
|
||||
let value = subBlock.value
|
||||
// Use normalized data only
|
||||
const blocks = normalizedData.blocks
|
||||
const edges = normalizedData.edges
|
||||
const loops = normalizedData.loops
|
||||
const parallels = normalizedData.parallels
|
||||
logger.info(
|
||||
`[${requestId}] Loaded scheduled workflow ${schedule.workflowId} from normalized tables`
|
||||
)
|
||||
|
||||
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
|
||||
const matches = value.match(/{{([^}]+)}}/g)
|
||||
if (matches) {
|
||||
for (const match of matches) {
|
||||
const varName = match.slice(2, -2)
|
||||
const encryptedValue = variables[varName]
|
||||
if (!encryptedValue) {
|
||||
throw new Error(`Environment variable "${varName}" was not found`)
|
||||
}
|
||||
const mergedStates = mergeSubblockState(blocks)
|
||||
|
||||
try {
|
||||
const { decrypted } = await decryptSecret(encryptedValue)
|
||||
value = (value as string).replace(match, decrypted)
|
||||
} catch (error: any) {
|
||||
logger.error(
|
||||
`[${requestId}] Error decrypting value for variable "${varName}"`,
|
||||
error
|
||||
)
|
||||
throw new Error(
|
||||
`Failed to decrypt environment variable "${varName}": ${error.message}`
|
||||
)
|
||||
// Retrieve environment variables for this user (if any).
|
||||
const [userEnv] = await db
|
||||
.select()
|
||||
.from(environmentTable)
|
||||
.where(eq(environmentTable.userId, workflowRecord.userId))
|
||||
.limit(1)
|
||||
|
||||
if (!userEnv) {
|
||||
logger.debug(
|
||||
`[${requestId}] No environment record found for user ${workflowRecord.userId}. Proceeding with empty variables.`
|
||||
)
|
||||
}
|
||||
|
||||
const variables = EnvVarsSchema.parse(userEnv?.variables ?? {})
|
||||
|
||||
const currentBlockStates = await Object.entries(mergedStates).reduce(
|
||||
async (accPromise, [id, block]) => {
|
||||
const acc = await accPromise
|
||||
acc[id] = await Object.entries(block.subBlocks).reduce(
|
||||
async (subAccPromise, [key, subBlock]) => {
|
||||
const subAcc = await subAccPromise
|
||||
let value = subBlock.value
|
||||
|
||||
if (
|
||||
typeof value === 'string' &&
|
||||
value.includes('{{') &&
|
||||
value.includes('}}')
|
||||
) {
|
||||
const matches = value.match(/{{([^}]+)}}/g)
|
||||
if (matches) {
|
||||
for (const match of matches) {
|
||||
const varName = match.slice(2, -2)
|
||||
const encryptedValue = variables[varName]
|
||||
if (!encryptedValue) {
|
||||
throw new Error(`Environment variable "${varName}" was not found`)
|
||||
}
|
||||
|
||||
try {
|
||||
const { decrypted } = await decryptSecret(encryptedValue)
|
||||
value = (value as string).replace(match, decrypted)
|
||||
} catch (error: any) {
|
||||
logger.error(
|
||||
`[${requestId}] Error decrypting value for variable "${varName}"`,
|
||||
error
|
||||
)
|
||||
throw new Error(
|
||||
`Failed to decrypt environment variable "${varName}": ${error.message}`
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
subAcc[key] = value
|
||||
return subAcc
|
||||
},
|
||||
Promise.resolve({} as Record<string, any>)
|
||||
)
|
||||
return acc
|
||||
},
|
||||
Promise.resolve({} as Record<string, Record<string, any>>)
|
||||
)
|
||||
subAcc[key] = value
|
||||
return subAcc
|
||||
},
|
||||
Promise.resolve({} as Record<string, any>)
|
||||
)
|
||||
return acc
|
||||
},
|
||||
Promise.resolve({} as Record<string, Record<string, any>>)
|
||||
)
|
||||
|
||||
const decryptedEnvVars: Record<string, string> = {}
|
||||
for (const [key, encryptedValue] of Object.entries(variables)) {
|
||||
try {
|
||||
const { decrypted } = await decryptSecret(encryptedValue)
|
||||
decryptedEnvVars[key] = decrypted
|
||||
} catch (error: any) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to decrypt environment variable "${key}"`,
|
||||
error
|
||||
)
|
||||
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
|
||||
const decryptedEnvVars: Record<string, string> = {}
|
||||
for (const [key, encryptedValue] of Object.entries(variables)) {
|
||||
try {
|
||||
const { decrypted } = await decryptSecret(encryptedValue)
|
||||
decryptedEnvVars[key] = decrypted
|
||||
} catch (error: any) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to decrypt environment variable "${key}"`,
|
||||
error
|
||||
)
|
||||
throw new Error(
|
||||
`Failed to decrypt environment variable "${key}": ${error.message}`
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process the block states to ensure response formats are properly parsed
|
||||
const processedBlockStates = Object.entries(currentBlockStates).reduce(
|
||||
(acc, [blockId, blockState]) => {
|
||||
// Check if this block has a responseFormat that needs to be parsed
|
||||
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
|
||||
const responseFormatValue = blockState.responseFormat.trim()
|
||||
// Process the block states to ensure response formats are properly parsed
|
||||
const processedBlockStates = Object.entries(currentBlockStates).reduce(
|
||||
(acc, [blockId, blockState]) => {
|
||||
// Check if this block has a responseFormat that needs to be parsed
|
||||
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
|
||||
const responseFormatValue = blockState.responseFormat.trim()
|
||||
|
||||
// Check for variable references like <start.input>
|
||||
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
|
||||
logger.debug(
|
||||
`[${requestId}] Response format contains variable reference for block ${blockId}`
|
||||
)
|
||||
// Keep variable references as-is - they will be resolved during execution
|
||||
acc[blockId] = blockState
|
||||
} else if (responseFormatValue === '') {
|
||||
// Empty string - remove response format
|
||||
acc[blockId] = {
|
||||
...blockState,
|
||||
responseFormat: undefined,
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
|
||||
// Attempt to parse the responseFormat if it's a string
|
||||
const parsedResponseFormat = JSON.parse(responseFormatValue)
|
||||
|
||||
acc[blockId] = {
|
||||
...blockState,
|
||||
responseFormat: parsedResponseFormat,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
|
||||
error
|
||||
// Check for variable references like <start.input>
|
||||
if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) {
|
||||
logger.debug(
|
||||
`[${requestId}] Response format contains variable reference for block ${blockId}`
|
||||
)
|
||||
// Set to undefined instead of keeping malformed JSON - this allows execution to continue
|
||||
// Keep variable references as-is - they will be resolved during execution
|
||||
acc[blockId] = blockState
|
||||
} else if (responseFormatValue === '') {
|
||||
// Empty string - remove response format
|
||||
acc[blockId] = {
|
||||
...blockState,
|
||||
responseFormat: undefined,
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
|
||||
// Attempt to parse the responseFormat if it's a string
|
||||
const parsedResponseFormat = JSON.parse(responseFormatValue)
|
||||
|
||||
acc[blockId] = {
|
||||
...blockState,
|
||||
responseFormat: parsedResponseFormat,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`,
|
||||
error
|
||||
)
|
||||
// Set to undefined instead of keeping malformed JSON - this allows execution to continue
|
||||
acc[blockId] = {
|
||||
...blockState,
|
||||
responseFormat: undefined,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
acc[blockId] = blockState
|
||||
}
|
||||
} else {
|
||||
acc[blockId] = blockState
|
||||
}
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, Record<string, any>>
|
||||
)
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, Record<string, any>>
|
||||
)
|
||||
|
||||
// Get workflow variables
|
||||
let workflowVariables = {}
|
||||
if (workflowRecord.variables) {
|
||||
try {
|
||||
if (typeof workflowRecord.variables === 'string') {
|
||||
workflowVariables = JSON.parse(workflowRecord.variables)
|
||||
} else {
|
||||
workflowVariables = workflowRecord.variables
|
||||
// Get workflow variables
|
||||
let workflowVariables = {}
|
||||
if (workflowRecord.variables) {
|
||||
try {
|
||||
if (typeof workflowRecord.variables === 'string') {
|
||||
workflowVariables = JSON.parse(workflowRecord.variables)
|
||||
} else {
|
||||
workflowVariables = workflowRecord.variables
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Failed to parse workflow variables: ${schedule.workflowId}`, error)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Failed to parse workflow variables: ${schedule.workflowId}`, error)
|
||||
}
|
||||
}
|
||||
|
||||
const serializedWorkflow = new Serializer().serializeWorkflow(
|
||||
mergedStates,
|
||||
edges,
|
||||
loops,
|
||||
parallels,
|
||||
true // Enable validation during execution
|
||||
)
|
||||
const serializedWorkflow = new Serializer().serializeWorkflow(
|
||||
mergedStates,
|
||||
edges,
|
||||
loops,
|
||||
parallels,
|
||||
true // Enable validation during execution
|
||||
)
|
||||
|
||||
const input = {
|
||||
workflowId: schedule.workflowId,
|
||||
_context: {
|
||||
const input = {
|
||||
workflowId: schedule.workflowId,
|
||||
},
|
||||
}
|
||||
|
||||
// Start logging with environment variables
|
||||
await loggingSession.safeStart({
|
||||
userId: workflowRecord.userId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
variables: variables || {},
|
||||
})
|
||||
|
||||
const executor = new Executor({
|
||||
workflow: serializedWorkflow,
|
||||
currentBlockStates: processedBlockStates,
|
||||
envVarValues: decryptedEnvVars,
|
||||
workflowInput: input,
|
||||
workflowVariables,
|
||||
contextExtensions: {
|
||||
executionId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
},
|
||||
})
|
||||
|
||||
// Set up logging on the executor
|
||||
loggingSession.setupExecutor(executor)
|
||||
|
||||
const result = await executor.execute(
|
||||
schedule.workflowId,
|
||||
schedule.blockId || undefined
|
||||
)
|
||||
|
||||
const executionResult =
|
||||
'stream' in result && 'execution' in result ? result.execution : result
|
||||
|
||||
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
|
||||
success: executionResult.success,
|
||||
executionTime: executionResult.metadata?.duration,
|
||||
})
|
||||
|
||||
if (executionResult.success) {
|
||||
await updateWorkflowRunCounts(schedule.workflowId)
|
||||
|
||||
try {
|
||||
await db
|
||||
.update(userStats)
|
||||
.set({
|
||||
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
|
||||
lastActive: now,
|
||||
})
|
||||
.where(eq(userStats.userId, workflowRecord.userId))
|
||||
|
||||
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
|
||||
} catch (statsError) {
|
||||
logger.error(`[${requestId}] Error updating user stats:`, statsError)
|
||||
_context: {
|
||||
workflowId: schedule.workflowId,
|
||||
},
|
||||
}
|
||||
|
||||
// Start logging with environment variables
|
||||
await loggingSession.safeStart({
|
||||
userId: workflowRecord.userId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
variables: variables || {},
|
||||
})
|
||||
|
||||
const executor = new Executor({
|
||||
workflow: serializedWorkflow,
|
||||
currentBlockStates: processedBlockStates,
|
||||
envVarValues: decryptedEnvVars,
|
||||
workflowInput: input,
|
||||
workflowVariables,
|
||||
contextExtensions: {
|
||||
executionId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
},
|
||||
})
|
||||
|
||||
// Set up logging on the executor
|
||||
loggingSession.setupExecutor(executor)
|
||||
|
||||
const result = await executor.execute(
|
||||
schedule.workflowId,
|
||||
schedule.blockId || undefined
|
||||
)
|
||||
|
||||
const executionResult =
|
||||
'stream' in result && 'execution' in result ? result.execution : result
|
||||
|
||||
logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, {
|
||||
success: executionResult.success,
|
||||
executionTime: executionResult.metadata?.duration,
|
||||
})
|
||||
|
||||
if (executionResult.success) {
|
||||
await updateWorkflowRunCounts(schedule.workflowId)
|
||||
|
||||
try {
|
||||
await db
|
||||
.update(userStats)
|
||||
.set({
|
||||
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
|
||||
lastActive: now,
|
||||
})
|
||||
.where(eq(userStats.userId, workflowRecord.userId))
|
||||
|
||||
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
|
||||
} catch (statsError) {
|
||||
logger.error(`[${requestId}] Error updating user stats:`, statsError)
|
||||
}
|
||||
}
|
||||
|
||||
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
|
||||
|
||||
// Complete logging
|
||||
await loggingSession.safeComplete({
|
||||
endedAt: new Date().toISOString(),
|
||||
totalDurationMs: totalDuration || 0,
|
||||
finalOutput: executionResult.output || {},
|
||||
traceSpans: (traceSpans || []) as any,
|
||||
})
|
||||
|
||||
return { success: executionResult.success, blocks, executionResult }
|
||||
} catch (earlyError: any) {
|
||||
// Handle errors that occur before workflow execution (e.g., missing data, env vars, etc.)
|
||||
logger.error(
|
||||
`[${requestId}] Early failure in scheduled workflow ${schedule.workflowId}`,
|
||||
earlyError
|
||||
)
|
||||
|
||||
// Create a minimal log entry for early failures
|
||||
try {
|
||||
await loggingSession.safeStart({
|
||||
userId: workflowRecord.userId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
variables: {},
|
||||
})
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
|
||||
stackTrace: earlyError.stack,
|
||||
})
|
||||
} catch (loggingError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to create log entry for early schedule failure`,
|
||||
loggingError
|
||||
)
|
||||
}
|
||||
|
||||
// Re-throw the error to be handled by the outer catch block
|
||||
throw earlyError
|
||||
}
|
||||
|
||||
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
|
||||
|
||||
// Complete logging
|
||||
await loggingSession.safeComplete({
|
||||
endedAt: new Date().toISOString(),
|
||||
totalDurationMs: totalDuration || 0,
|
||||
finalOutput: executionResult.output || {},
|
||||
traceSpans: (traceSpans || []) as any,
|
||||
})
|
||||
|
||||
return { success: executionResult.success, blocks, executionResult }
|
||||
})()
|
||||
|
||||
if (executionSuccess.success) {
|
||||
@@ -539,7 +575,31 @@ export async function GET() {
|
||||
error
|
||||
)
|
||||
|
||||
// Error logging handled by logging session inside sync executor
|
||||
// Ensure we create a log entry for this failed execution
|
||||
try {
|
||||
const failureLoggingSession = new LoggingSession(
|
||||
schedule.workflowId,
|
||||
executionId,
|
||||
'schedule',
|
||||
requestId
|
||||
)
|
||||
|
||||
await failureLoggingSession.safeStart({
|
||||
userId: workflowRecord.userId,
|
||||
workspaceId: workflowRecord.workspaceId || '',
|
||||
variables: {},
|
||||
})
|
||||
|
||||
await failureLoggingSession.safeCompleteWithError({
|
||||
message: `Schedule execution failed: ${error.message}`,
|
||||
stackTrace: error.stack,
|
||||
})
|
||||
} catch (loggingError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to create log entry for failed schedule execution`,
|
||||
loggingError
|
||||
)
|
||||
}
|
||||
|
||||
let nextRunAt: Date
|
||||
try {
|
||||
|
||||
@@ -27,6 +27,11 @@ describe('Schedule Configuration API Route', () => {
|
||||
}),
|
||||
}))
|
||||
|
||||
// Mock permissions
|
||||
vi.doMock('@/lib/permissions/utils', () => ({
|
||||
getUserEntityPermissions: vi.fn().mockResolvedValue('admin'), // User has admin permissions
|
||||
}))
|
||||
|
||||
// Extend sampleWorkflowState for scheduling
|
||||
const _workflowStateWithSchedule = {
|
||||
...sampleWorkflowState,
|
||||
@@ -46,33 +51,39 @@ describe('Schedule Configuration API Route', () => {
|
||||
}
|
||||
|
||||
// Create mock database with test schedules
|
||||
// Mock the database to return workflow data for authorization check
|
||||
vi.doMock('@/db', () => {
|
||||
let callCount = 0
|
||||
const mockDb = {
|
||||
select: vi.fn().mockImplementation(() => ({
|
||||
from: vi.fn().mockImplementation((table: string) => {
|
||||
if (table === 'workflow_schedule') {
|
||||
return {
|
||||
where: vi.fn().mockImplementation(() => ({
|
||||
limit: vi.fn().mockImplementation(() => [
|
||||
from: vi.fn().mockImplementation(() => ({
|
||||
where: vi.fn().mockImplementation(() => ({
|
||||
limit: vi.fn().mockImplementation(() => {
|
||||
callCount++
|
||||
// First call: workflow lookup for authorization
|
||||
if (callCount === 1) {
|
||||
return [
|
||||
{
|
||||
id: 'schedule-id',
|
||||
workflowId: 'workflow-id',
|
||||
id: 'workflow-id',
|
||||
userId: 'user-id',
|
||||
nextRunAt: new Date(),
|
||||
lastRanAt: null,
|
||||
cronExpression: '0 9 * * *',
|
||||
triggerType: 'schedule',
|
||||
workspaceId: null, // User owns the workflow directly
|
||||
},
|
||||
]),
|
||||
})),
|
||||
}
|
||||
}
|
||||
return {
|
||||
where: vi.fn().mockImplementation(() => ({
|
||||
limit: vi.fn().mockImplementation(() => []),
|
||||
})),
|
||||
}
|
||||
}),
|
||||
]
|
||||
}
|
||||
// Second call: existing schedule lookup - return existing schedule for update test
|
||||
return [
|
||||
{
|
||||
id: 'existing-schedule-id',
|
||||
workflowId: 'workflow-id',
|
||||
blockId: 'starter-id',
|
||||
cronExpression: '0 9 * * *',
|
||||
nextRunAt: new Date(),
|
||||
status: 'active',
|
||||
},
|
||||
]
|
||||
}),
|
||||
})),
|
||||
})),
|
||||
})),
|
||||
insert: vi.fn().mockImplementation(() => ({
|
||||
values: vi.fn().mockImplementation(() => ({
|
||||
@@ -176,94 +187,6 @@ describe('Schedule Configuration API Route', () => {
|
||||
// Instead, we just verify that the response has the expected properties
|
||||
})
|
||||
|
||||
/**
|
||||
* Test updating an existing schedule
|
||||
*/
|
||||
it('should update an existing schedule', async () => {
|
||||
// Setup the specific DB mock for this test
|
||||
vi.doMock('@/db', () => {
|
||||
const mockDb = {
|
||||
select: vi.fn().mockImplementation(() => ({
|
||||
from: vi.fn().mockImplementation(() => ({
|
||||
where: vi.fn().mockImplementation(() => ({
|
||||
limit: vi.fn().mockImplementation(() => [
|
||||
{
|
||||
id: 'schedule-id',
|
||||
workflowId: 'workflow-id',
|
||||
nextRunAt: new Date(),
|
||||
cronExpression: '0 9 * * *',
|
||||
},
|
||||
]),
|
||||
})),
|
||||
})),
|
||||
})),
|
||||
insert: vi.fn().mockImplementation(() => ({
|
||||
values: vi.fn().mockImplementation(() => ({
|
||||
onConflictDoUpdate: vi.fn().mockResolvedValue({}),
|
||||
})),
|
||||
})),
|
||||
delete: vi.fn().mockImplementation(() => ({
|
||||
where: vi.fn().mockResolvedValue([]),
|
||||
})),
|
||||
}
|
||||
|
||||
return { db: mockDb }
|
||||
})
|
||||
|
||||
// Create a mock request with updated schedule
|
||||
const req = createMockRequest('POST', {
|
||||
workflowId: 'workflow-id',
|
||||
state: {
|
||||
blocks: {
|
||||
'starter-id': {
|
||||
type: 'starter',
|
||||
subBlocks: {
|
||||
startWorkflow: { value: 'schedule' },
|
||||
scheduleType: { value: 'daily' },
|
||||
scheduleTime: { value: '10:30' }, // Updated time
|
||||
dailyTime: { value: '10:30' },
|
||||
},
|
||||
},
|
||||
},
|
||||
edges: [],
|
||||
loops: {},
|
||||
},
|
||||
})
|
||||
|
||||
// Override the schedule utils mock for this test
|
||||
vi.doMock('@/lib/schedules/utils', () => ({
|
||||
getScheduleTimeValues: vi.fn().mockReturnValue({
|
||||
scheduleTime: '10:30',
|
||||
dailyTime: [10, 30],
|
||||
}),
|
||||
getSubBlockValue: vi.fn().mockImplementation((block: any, id: string) => {
|
||||
const subBlocks = {
|
||||
startWorkflow: 'schedule',
|
||||
scheduleType: 'daily',
|
||||
scheduleTime: '10:30',
|
||||
dailyTime: '10:30',
|
||||
}
|
||||
return subBlocks[id as keyof typeof subBlocks] || ''
|
||||
}),
|
||||
generateCronExpression: vi.fn().mockReturnValue('0 10 * * *'),
|
||||
calculateNextRunTime: vi.fn().mockReturnValue(new Date()),
|
||||
BlockState: {},
|
||||
}))
|
||||
|
||||
// Import the route handler after mocks are set up
|
||||
const { POST } = await import('@/app/api/schedules/route')
|
||||
|
||||
// Call the handler
|
||||
const response = await POST(req)
|
||||
|
||||
// Verify response
|
||||
expect(response).toBeDefined()
|
||||
expect(response.status).toBe(200)
|
||||
|
||||
const responseData = await response.json()
|
||||
expect(responseData).toHaveProperty('message', 'Schedule updated')
|
||||
})
|
||||
|
||||
/**
|
||||
* Test removing a schedule
|
||||
*/
|
||||
|
||||
@@ -4,6 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getUserEntityPermissions } from '@/lib/permissions/utils'
|
||||
import {
|
||||
type BlockState,
|
||||
calculateNextRunTime,
|
||||
@@ -13,7 +14,7 @@ import {
|
||||
validateCronExpression,
|
||||
} from '@/lib/schedules/utils'
|
||||
import { db } from '@/db'
|
||||
import { workflowSchedule } from '@/db/schema'
|
||||
import { workflow, workflowSchedule } from '@/db/schema'
|
||||
|
||||
const logger = createLogger('ScheduledAPI')
|
||||
|
||||
@@ -87,6 +88,34 @@ export async function GET(req: NextRequest) {
|
||||
return NextResponse.json({ error: 'Missing workflowId parameter' }, { status: 400 })
|
||||
}
|
||||
|
||||
// Check if user has permission to view this workflow
|
||||
const [workflowRecord] = await db
|
||||
.select({ userId: workflow.userId, workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowRecord) {
|
||||
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Check authorization - either the user owns the workflow or has workspace permissions
|
||||
let isAuthorized = workflowRecord.userId === session.user.id
|
||||
|
||||
// If not authorized by ownership and the workflow belongs to a workspace, check workspace permissions
|
||||
if (!isAuthorized && workflowRecord.workspaceId) {
|
||||
const userPermission = await getUserEntityPermissions(
|
||||
session.user.id,
|
||||
'workspace',
|
||||
workflowRecord.workspaceId
|
||||
)
|
||||
isAuthorized = userPermission !== null
|
||||
}
|
||||
|
||||
if (!isAuthorized) {
|
||||
return NextResponse.json({ error: 'Not authorized to view this workflow' }, { status: 403 })
|
||||
}
|
||||
|
||||
const now = Date.now()
|
||||
const lastLog = recentRequests.get(workflowId) || 0
|
||||
const shouldLog = now - lastLog > LOGGING_THROTTLE_MS
|
||||
@@ -152,6 +181,38 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
logger.info(`[${requestId}] Processing schedule update for workflow ${workflowId}`)
|
||||
|
||||
// Check if user has permission to modify this workflow
|
||||
const [workflowRecord] = await db
|
||||
.select({ userId: workflow.userId, workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowRecord) {
|
||||
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
|
||||
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Check authorization - either the user owns the workflow or has write/admin workspace permissions
|
||||
let isAuthorized = workflowRecord.userId === session.user.id
|
||||
|
||||
// If not authorized by ownership and the workflow belongs to a workspace, check workspace permissions
|
||||
if (!isAuthorized && workflowRecord.workspaceId) {
|
||||
const userPermission = await getUserEntityPermissions(
|
||||
session.user.id,
|
||||
'workspace',
|
||||
workflowRecord.workspaceId
|
||||
)
|
||||
isAuthorized = userPermission === 'write' || userPermission === 'admin'
|
||||
}
|
||||
|
||||
if (!isAuthorized) {
|
||||
logger.warn(
|
||||
`[${requestId}] User not authorized to modify schedule for workflow: ${workflowId}`
|
||||
)
|
||||
return NextResponse.json({ error: 'Not authorized to modify this workflow' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Find the target block - prioritize the specific blockId if provided
|
||||
let targetBlock: BlockState | undefined
|
||||
if (blockId) {
|
||||
|
||||
Reference in New Issue
Block a user