mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(execution): report cancellation durability truthfully (#3550)
* fix(cancel): report cancellation durability truthfully Return explicit durability results for execution cancellation so success only reflects persisted cancellation state instead of best-effort Redis availability. * fix: hoist cancellation test mocks Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * fix(sim): harden execution cancel durability Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * fix(sim): fallback manual cancel without redis Abort active manual SSE executions locally when Redis cannot durably record the cancellation marker so the run still finalizes as cancelled instead of completing normally. * test: mock AuthType in async execute route Keep the rebased async execute route test aligned with the current hybrid auth module exports so it exercises the queueing path instead of failing at import time. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: test <test@example.com>
This commit is contained in:
@@ -18,6 +18,11 @@ const {
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/auth/hybrid', () => ({
|
||||
AuthType: {
|
||||
SESSION: 'session',
|
||||
API_KEY: 'api_key',
|
||||
INTERNAL_JWT: 'internal_jwt',
|
||||
},
|
||||
checkHybridAuth: mockCheckHybridAuth,
|
||||
AuthType: {
|
||||
SESSION: 'session',
|
||||
|
||||
@@ -20,6 +20,10 @@ import {
|
||||
} from '@/lib/execution/call-chain'
|
||||
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
|
||||
import { processInputFileFields } from '@/lib/execution/files'
|
||||
import {
|
||||
registerManualExecutionAborter,
|
||||
unregisterManualExecutionAborter,
|
||||
} from '@/lib/execution/manual-cancellation'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import {
|
||||
@@ -845,6 +849,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const encoder = new TextEncoder()
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
|
||||
let isStreamClosed = false
|
||||
let isManualAbortRegistered = false
|
||||
|
||||
const eventWriter = createExecutionEventWriter(executionId)
|
||||
setExecutionMeta(executionId, {
|
||||
@@ -857,6 +862,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
async start(controller) {
|
||||
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
|
||||
|
||||
registerManualExecutionAborter(executionId, timeoutController.abort)
|
||||
isManualAbortRegistered = true
|
||||
|
||||
const sendEvent = (event: ExecutionEvent) => {
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
@@ -1224,6 +1232,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
})
|
||||
finalMetaStatus = 'error'
|
||||
} finally {
|
||||
if (isManualAbortRegistered) {
|
||||
unregisterManualExecutionAborter(executionId)
|
||||
isManualAbortRegistered = false
|
||||
}
|
||||
try {
|
||||
await eventWriter.close()
|
||||
} catch (closeError) {
|
||||
|
||||
@@ -0,0 +1,148 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
|
||||
import { NextRequest } from 'next/server'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
const mockCheckHybridAuth = vi.fn()
|
||||
const mockAuthorizeWorkflowByWorkspacePermission = vi.fn()
|
||||
const mockMarkExecutionCancelled = vi.fn()
|
||||
const mockAbortManualExecution = vi.fn()
|
||||
|
||||
vi.mock('@sim/logger', () => ({
|
||||
createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/auth/hybrid', () => ({
|
||||
checkHybridAuth: (...args: unknown[]) => mockCheckHybridAuth(...args),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/execution/cancellation', () => ({
|
||||
markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/execution/manual-cancellation', () => ({
|
||||
abortManualExecution: (...args: unknown[]) => mockAbortManualExecution(...args),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/workflows/utils', () => ({
|
||||
authorizeWorkflowByWorkspacePermission: (params: unknown) =>
|
||||
mockAuthorizeWorkflowByWorkspacePermission(params),
|
||||
}))
|
||||
|
||||
import { POST } from './route'
|
||||
|
||||
describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' })
|
||||
mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true })
|
||||
mockAbortManualExecution.mockReturnValue(false)
|
||||
})
|
||||
|
||||
it('returns success when cancellation was durably recorded', async () => {
|
||||
mockMarkExecutionCancelled.mockResolvedValue({
|
||||
durablyRecorded: true,
|
||||
reason: 'recorded',
|
||||
})
|
||||
|
||||
const response = await POST(
|
||||
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
|
||||
method: 'POST',
|
||||
}),
|
||||
{
|
||||
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
|
||||
}
|
||||
)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
await expect(response.json()).resolves.toEqual({
|
||||
success: true,
|
||||
executionId: 'ex-1',
|
||||
redisAvailable: true,
|
||||
durablyRecorded: true,
|
||||
locallyAborted: false,
|
||||
reason: 'recorded',
|
||||
})
|
||||
})
|
||||
|
||||
it('returns unsuccessful response when Redis is unavailable', async () => {
|
||||
mockMarkExecutionCancelled.mockResolvedValue({
|
||||
durablyRecorded: false,
|
||||
reason: 'redis_unavailable',
|
||||
})
|
||||
|
||||
const response = await POST(
|
||||
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
|
||||
method: 'POST',
|
||||
}),
|
||||
{
|
||||
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
|
||||
}
|
||||
)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
await expect(response.json()).resolves.toEqual({
|
||||
success: false,
|
||||
executionId: 'ex-1',
|
||||
redisAvailable: false,
|
||||
durablyRecorded: false,
|
||||
locallyAborted: false,
|
||||
reason: 'redis_unavailable',
|
||||
})
|
||||
})
|
||||
|
||||
it('returns unsuccessful response when Redis persistence fails', async () => {
|
||||
mockMarkExecutionCancelled.mockResolvedValue({
|
||||
durablyRecorded: false,
|
||||
reason: 'redis_write_failed',
|
||||
})
|
||||
|
||||
const response = await POST(
|
||||
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
|
||||
method: 'POST',
|
||||
}),
|
||||
{
|
||||
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
|
||||
}
|
||||
)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
await expect(response.json()).resolves.toEqual({
|
||||
success: false,
|
||||
executionId: 'ex-1',
|
||||
redisAvailable: true,
|
||||
durablyRecorded: false,
|
||||
locallyAborted: false,
|
||||
reason: 'redis_write_failed',
|
||||
})
|
||||
})
|
||||
|
||||
it('returns success when local fallback aborts execution without Redis durability', async () => {
|
||||
mockMarkExecutionCancelled.mockResolvedValue({
|
||||
durablyRecorded: false,
|
||||
reason: 'redis_unavailable',
|
||||
})
|
||||
mockAbortManualExecution.mockReturnValue(true)
|
||||
|
||||
const response = await POST(
|
||||
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
|
||||
method: 'POST',
|
||||
}),
|
||||
{
|
||||
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
|
||||
}
|
||||
)
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
await expect(response.json()).resolves.toEqual({
|
||||
success: true,
|
||||
executionId: 'ex-1',
|
||||
redisAvailable: false,
|
||||
durablyRecorded: false,
|
||||
locallyAborted: true,
|
||||
reason: 'redis_unavailable',
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { abortManualExecution } from '@/lib/execution/manual-cancellation'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
|
||||
const logger = createLogger('CancelExecutionAPI')
|
||||
@@ -45,20 +46,27 @@ export async function POST(
|
||||
|
||||
logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })
|
||||
|
||||
const marked = await markExecutionCancelled(executionId)
|
||||
const cancellation = await markExecutionCancelled(executionId)
|
||||
const locallyAborted = abortManualExecution(executionId)
|
||||
|
||||
if (marked) {
|
||||
if (cancellation.durablyRecorded) {
|
||||
logger.info('Execution marked as cancelled in Redis', { executionId })
|
||||
} else if (locallyAborted) {
|
||||
logger.info('Execution cancelled via local in-process fallback', { executionId })
|
||||
} else {
|
||||
logger.info('Redis not available, cancellation will rely on connection close', {
|
||||
logger.warn('Execution cancellation was not durably recorded', {
|
||||
executionId,
|
||||
reason: cancellation.reason,
|
||||
})
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
success: cancellation.durablyRecorded || locallyAborted,
|
||||
executionId,
|
||||
redisAvailable: marked,
|
||||
redisAvailable: cancellation.reason !== 'redis_unavailable',
|
||||
durablyRecorded: cancellation.durablyRecorded,
|
||||
locallyAborted,
|
||||
reason: cancellation.reason,
|
||||
})
|
||||
} catch (error: any) {
|
||||
logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message })
|
||||
|
||||
84
apps/sim/lib/execution/cancellation.test.ts
Normal file
84
apps/sim/lib/execution/cancellation.test.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { loggerMock } from '@sim/testing'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
const { mockGetRedisClient, mockRedisSet } = vi.hoisted(() => ({
|
||||
mockGetRedisClient: vi.fn(),
|
||||
mockRedisSet: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@sim/logger', () => loggerMock)
|
||||
|
||||
vi.mock('@/lib/core/config/redis', () => ({
|
||||
getRedisClient: mockGetRedisClient,
|
||||
}))
|
||||
|
||||
import { markExecutionCancelled } from './cancellation'
|
||||
import {
|
||||
abortManualExecution,
|
||||
registerManualExecutionAborter,
|
||||
unregisterManualExecutionAborter,
|
||||
} from './manual-cancellation'
|
||||
|
||||
describe('markExecutionCancelled', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('returns redis_unavailable when no Redis client exists', async () => {
|
||||
mockGetRedisClient.mockReturnValue(null)
|
||||
|
||||
await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
|
||||
durablyRecorded: false,
|
||||
reason: 'redis_unavailable',
|
||||
})
|
||||
})
|
||||
|
||||
it('returns recorded when Redis write succeeds', async () => {
|
||||
mockRedisSet.mockResolvedValue('OK')
|
||||
mockGetRedisClient.mockReturnValue({ set: mockRedisSet })
|
||||
|
||||
await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
|
||||
durablyRecorded: true,
|
||||
reason: 'recorded',
|
||||
})
|
||||
})
|
||||
|
||||
it('returns redis_write_failed when Redis write throws', async () => {
|
||||
mockRedisSet.mockRejectedValue(new Error('set failed'))
|
||||
mockGetRedisClient.mockReturnValue({ set: mockRedisSet })
|
||||
|
||||
await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
|
||||
durablyRecorded: false,
|
||||
reason: 'redis_write_failed',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('manual execution cancellation registry', () => {
|
||||
beforeEach(() => {
|
||||
unregisterManualExecutionAborter('execution-1')
|
||||
})
|
||||
|
||||
it('aborts registered executions', () => {
|
||||
const abort = vi.fn()
|
||||
|
||||
registerManualExecutionAborter('execution-1', abort)
|
||||
|
||||
expect(abortManualExecution('execution-1')).toBe(true)
|
||||
expect(abort).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('returns false when no execution is registered', () => {
|
||||
expect(abortManualExecution('execution-missing')).toBe(false)
|
||||
})
|
||||
|
||||
it('unregisters executions', () => {
|
||||
const abort = vi.fn()
|
||||
|
||||
registerManualExecutionAborter('execution-1', abort)
|
||||
unregisterManualExecutionAborter('execution-1')
|
||||
|
||||
expect(abortManualExecution('execution-1')).toBe(false)
|
||||
expect(abort).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
@@ -6,27 +6,36 @@ const logger = createLogger('ExecutionCancellation')
|
||||
const EXECUTION_CANCEL_PREFIX = 'execution:cancel:'
|
||||
const EXECUTION_CANCEL_EXPIRY = 60 * 60
|
||||
|
||||
export type ExecutionCancellationRecordResult =
|
||||
| { durablyRecorded: true; reason: 'recorded' }
|
||||
| {
|
||||
durablyRecorded: false
|
||||
reason: 'redis_unavailable' | 'redis_write_failed'
|
||||
}
|
||||
|
||||
export function isRedisCancellationEnabled(): boolean {
|
||||
return getRedisClient() !== null
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark an execution as cancelled in Redis.
|
||||
* Returns true if Redis is available and the flag was set, false otherwise.
|
||||
* Returns whether the cancellation was durably recorded.
|
||||
*/
|
||||
export async function markExecutionCancelled(executionId: string): Promise<boolean> {
|
||||
export async function markExecutionCancelled(
|
||||
executionId: string
|
||||
): Promise<ExecutionCancellationRecordResult> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
return false
|
||||
return { durablyRecorded: false, reason: 'redis_unavailable' }
|
||||
}
|
||||
|
||||
try {
|
||||
await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY)
|
||||
logger.info('Marked execution as cancelled', { executionId })
|
||||
return true
|
||||
return { durablyRecorded: true, reason: 'recorded' }
|
||||
} catch (error) {
|
||||
logger.error('Failed to mark execution as cancelled', { executionId, error })
|
||||
return false
|
||||
return { durablyRecorded: false, reason: 'redis_write_failed' }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
19
apps/sim/lib/execution/manual-cancellation.ts
Normal file
19
apps/sim/lib/execution/manual-cancellation.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
const activeExecutionAborters = new Map<string, () => void>()
|
||||
|
||||
export function registerManualExecutionAborter(executionId: string, abort: () => void): void {
|
||||
activeExecutionAborters.set(executionId, abort)
|
||||
}
|
||||
|
||||
export function unregisterManualExecutionAborter(executionId: string): void {
|
||||
activeExecutionAborters.delete(executionId)
|
||||
}
|
||||
|
||||
export function abortManualExecution(executionId: string): boolean {
|
||||
const abort = activeExecutionAborters.get(executionId)
|
||||
if (!abort) {
|
||||
return false
|
||||
}
|
||||
|
||||
abort()
|
||||
return true
|
||||
}
|
||||
Reference in New Issue
Block a user