mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Compare commits
3 Commits
dev
...
feat/multi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
33aa581351 | ||
|
|
bc41ea02b9 | ||
|
|
b0c0ee29a8 |
@@ -20,7 +20,7 @@ The Response block formats and sends structured HTTP responses back to API calle
|
||||
</div>
|
||||
|
||||
<Callout type="info">
|
||||
Response blocks are terminal blocks - they end workflow execution and cannot connect to other blocks.
|
||||
Response blocks are exit points — when a Response block executes, it ends the workflow and sends the HTTP response immediately. Multiple Response blocks can be placed on different branches (e.g. after a Router or Condition), but only the first one to execute determines the API response.
|
||||
</Callout>
|
||||
|
||||
## Configuration Options
|
||||
@@ -77,7 +77,11 @@ Condition (Error Detected) → Router → Response (400/500, Error Details)
|
||||
|
||||
## Outputs
|
||||
|
||||
Response blocks are terminal — no downstream blocks execute after them. However, the block does define outputs (`data`, `status`, `headers`) which are used to construct the HTTP response sent back to the API caller.
|
||||
Response blocks are exit points — when one executes, no further blocks run. The block defines outputs (`data`, `status`, `headers`) which are used to construct the HTTP response sent back to the API caller.
|
||||
|
||||
<Callout type="warning">
|
||||
If a Response block is placed on a parallel branch, there are no guarantees about whether other parallel blocks will run or not. Execution order across parallel branches is non-deterministic, so a parallel block may execute before or after the Response block on any given run. Avoid placing Response blocks in parallel with blocks that have important side effects.
|
||||
</Callout>
|
||||
|
||||
## Variable References
|
||||
|
||||
@@ -110,10 +114,10 @@ Use the `<variable.name>` syntax to dynamically insert workflow variables into y
|
||||
- **Validate variable references**: Ensure all referenced variables exist and contain the expected data types before the Response block executes
|
||||
|
||||
<FAQ items={[
|
||||
{ question: "Can I have multiple Response blocks in a workflow?", answer: "No. The Response block is a single-instance block — only one is allowed per workflow. If you need different responses for different conditions, use a Condition or Router block upstream to determine what data reaches the single Response block." },
|
||||
{ question: "Can I have multiple Response blocks in a workflow?", answer: "Yes. You can place multiple Response blocks on different branches (e.g. after a Router or Condition block). The first Response block to execute determines the API response and ends the workflow. This is useful for returning different responses based on conditions — for example, a 200 on the success branch and a 500 on the error branch." },
|
||||
{ question: "What triggers require a Response block?", answer: "The Response block is designed for use with the API Trigger. When your workflow is invoked via the API, the Response block sends the structured HTTP response back to the caller. Other trigger types (like webhooks or schedules) do not require a Response block." },
|
||||
{ question: "What is the difference between Builder and Editor mode?", answer: "Builder mode provides a visual interface for constructing your response structure with fields and types. Editor mode gives you a raw JSON code editor where you can write the response body directly. Builder mode is recommended for most use cases." },
|
||||
{ question: "What is the default status code?", answer: "If you do not specify a status code, the Response block defaults to 200 (OK). You can set any valid HTTP status code including error codes like 400, 404, or 500." },
|
||||
{ question: "Can the Response block connect to downstream blocks?", answer: "No. Response blocks are terminal — they end workflow execution and send the HTTP response. No further blocks can be connected after a Response block." },
|
||||
{ question: "Can the Response block connect to downstream blocks?", answer: "No. Response blocks are exit points — they end workflow execution and send the HTTP response. No further blocks can execute after a Response block." },
|
||||
]} />
|
||||
|
||||
|
||||
@@ -96,8 +96,9 @@ Understanding these core principles will help you build better workflows:
|
||||
2. **Automatic Parallelization**: Independent blocks run concurrently without configuration
|
||||
3. **Smart Data Flow**: Outputs flow automatically to connected blocks
|
||||
4. **Error Handling**: Failed blocks stop their execution path but don't affect independent paths
|
||||
5. **State Persistence**: All block outputs and execution details are preserved for debugging
|
||||
6. **Cycle Protection**: Workflows that call other workflows (via Workflow blocks, MCP tools, or API blocks) are tracked with a call chain. If the chain exceeds 25 hops, execution is stopped to prevent infinite loops
|
||||
5. **Response Blocks as Exit Points**: When a Response block executes, the entire workflow stops and the API response is sent immediately. Multiple Response blocks can exist on different branches — the first one to execute wins
|
||||
6. **State Persistence**: All block outputs and execution details are preserved for debugging
|
||||
7. **Cycle Protection**: Workflows that call other workflows (via Workflow blocks, MCP tools, or API blocks) are tracked with a call chain. If the chain exceeds 25 hops, execution is stopped to prevent infinite loops
|
||||
|
||||
## Next Steps
|
||||
|
||||
|
||||
65
apps/sim/background/lifecycle-email.ts
Normal file
65
apps/sim/background/lifecycle-email.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { db } from '@sim/db'
|
||||
import { user } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { getEmailSubject, renderOnboardingFollowupEmail } from '@/components/emails'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { checkEnterprisePlan } from '@/lib/billing/subscriptions/utils'
|
||||
import { sendEmail } from '@/lib/messaging/email/mailer'
|
||||
import { getPersonalEmailFrom } from '@/lib/messaging/email/utils'
|
||||
import { LIFECYCLE_EMAIL_TASK_ID, type LifecycleEmailType } from '@/lib/messaging/lifecycle'
|
||||
|
||||
const logger = createLogger('LifecycleEmail')
|
||||
|
||||
interface LifecycleEmailParams {
|
||||
userId: string
|
||||
type: LifecycleEmailType
|
||||
}
|
||||
|
||||
async function sendLifecycleEmail({ userId, type }: LifecycleEmailParams): Promise<void> {
|
||||
const [userData] = await db.select().from(user).where(eq(user.id, userId)).limit(1)
|
||||
|
||||
if (!userData?.email) {
|
||||
logger.warn('[lifecycle-email] User not found or has no email', { userId, type })
|
||||
return
|
||||
}
|
||||
|
||||
const subscription = await getHighestPrioritySubscription(userId)
|
||||
if (checkEnterprisePlan(subscription)) {
|
||||
logger.info('[lifecycle-email] Skipping lifecycle email for enterprise user', { userId, type })
|
||||
return
|
||||
}
|
||||
|
||||
const { from, replyTo } = getPersonalEmailFrom()
|
||||
|
||||
let html: string
|
||||
|
||||
switch (type) {
|
||||
case 'onboarding-followup':
|
||||
html = await renderOnboardingFollowupEmail(userData.name || undefined)
|
||||
break
|
||||
default:
|
||||
logger.warn('[lifecycle-email] Unknown lifecycle email type', { type })
|
||||
return
|
||||
}
|
||||
|
||||
await sendEmail({
|
||||
to: userData.email,
|
||||
subject: getEmailSubject(type),
|
||||
html,
|
||||
from,
|
||||
replyTo,
|
||||
emailType: 'transactional',
|
||||
})
|
||||
|
||||
logger.info('[lifecycle-email] Sent lifecycle email', { userId, type })
|
||||
}
|
||||
|
||||
export const lifecycleEmailTask = task({
|
||||
id: LIFECYCLE_EMAIL_TASK_ID,
|
||||
retry: { maxAttempts: 2 },
|
||||
run: async (params: LifecycleEmailParams) => {
|
||||
await sendLifecycleEmail(params)
|
||||
},
|
||||
})
|
||||
@@ -12,12 +12,13 @@ export const ResponseBlock: BlockConfig<ResponseBlockOutput> = {
|
||||
bestPractices: `
|
||||
- Only use this if the trigger block is the API Trigger.
|
||||
- Prefer the builder mode over the editor mode.
|
||||
- This is usually used as the last block in the workflow.
|
||||
- The Response block is an exit point. When it executes, the workflow stops and the API response is sent immediately.
|
||||
- Multiple Response blocks can be placed on different branches (e.g. after a Router or Condition). The first one to execute determines the API response and ends the workflow.
|
||||
- If a Response block is on a parallel branch, there are no guarantees about whether other parallel blocks will run. Avoid placing Response blocks in parallel with blocks that have important side effects.
|
||||
`,
|
||||
category: 'blocks',
|
||||
bgColor: '#2F55FF',
|
||||
icon: ResponseIcon,
|
||||
singleInstance: true,
|
||||
subBlocks: [
|
||||
{
|
||||
id: 'dataMode',
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
export { OnboardingFollowupEmail } from './onboarding-followup-email'
|
||||
export { OTPVerificationEmail } from './otp-verification-email'
|
||||
export { ResetPasswordEmail } from './reset-password-email'
|
||||
export { WelcomeEmail } from './welcome-email'
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import { Body, Head, Html, Preview, Text } from '@react-email/components'
|
||||
|
||||
interface OnboardingFollowupEmailProps {
|
||||
userName?: string
|
||||
}
|
||||
|
||||
const styles = {
|
||||
body: {
|
||||
fontFamily: '-apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif',
|
||||
backgroundColor: '#ffffff',
|
||||
margin: '0',
|
||||
padding: '0',
|
||||
},
|
||||
container: {
|
||||
maxWidth: '560px',
|
||||
margin: '40px auto',
|
||||
padding: '0 24px',
|
||||
},
|
||||
p: {
|
||||
fontSize: '15px',
|
||||
lineHeight: '1.6',
|
||||
color: '#1a1a1a',
|
||||
margin: '0 0 16px',
|
||||
},
|
||||
} as const
|
||||
|
||||
export function OnboardingFollowupEmail({ userName }: OnboardingFollowupEmailProps) {
|
||||
return (
|
||||
<Html>
|
||||
<Head />
|
||||
<Preview>Quick question</Preview>
|
||||
<Body style={styles.body}>
|
||||
<div style={styles.container}>
|
||||
<Text style={styles.p}>{userName ? `Hey ${userName},` : 'Hey,'}</Text>
|
||||
<Text style={styles.p}>
|
||||
It's been a few days since you signed up. I hope you're enjoying Sim!
|
||||
</Text>
|
||||
<Text style={styles.p}>
|
||||
I'd love to know — what did you expect when you signed up vs. what did you get?
|
||||
</Text>
|
||||
<Text style={styles.p}>
|
||||
A reply with your thoughts would really help us improve the product for everyone.
|
||||
</Text>
|
||||
<Text style={styles.p}>
|
||||
Thanks,
|
||||
<br />
|
||||
Emir
|
||||
<br />
|
||||
Founder, Sim
|
||||
</Text>
|
||||
</div>
|
||||
</Body>
|
||||
</Html>
|
||||
)
|
||||
}
|
||||
|
||||
export default OnboardingFollowupEmail
|
||||
@@ -1,5 +1,10 @@
|
||||
import { render } from '@react-email/components'
|
||||
import { OTPVerificationEmail, ResetPasswordEmail, WelcomeEmail } from '@/components/emails/auth'
|
||||
import {
|
||||
OnboardingFollowupEmail,
|
||||
OTPVerificationEmail,
|
||||
ResetPasswordEmail,
|
||||
WelcomeEmail,
|
||||
} from '@/components/emails/auth'
|
||||
import {
|
||||
CreditPurchaseEmail,
|
||||
EnterpriseSubscriptionEmail,
|
||||
@@ -159,6 +164,10 @@ export async function renderWelcomeEmail(userName?: string): Promise<string> {
|
||||
return await render(WelcomeEmail({ userName }))
|
||||
}
|
||||
|
||||
export async function renderOnboardingFollowupEmail(userName?: string): Promise<string> {
|
||||
return await render(OnboardingFollowupEmail({ userName }))
|
||||
}
|
||||
|
||||
export async function renderCreditPurchaseEmail(params: {
|
||||
userName?: string
|
||||
amount: number
|
||||
|
||||
@@ -16,6 +16,7 @@ export type EmailSubjectType =
|
||||
| 'plan-welcome-pro'
|
||||
| 'plan-welcome-team'
|
||||
| 'credit-purchase'
|
||||
| 'onboarding-followup'
|
||||
| 'welcome'
|
||||
|
||||
/**
|
||||
@@ -55,6 +56,8 @@ export function getEmailSubject(type: EmailSubjectType): string {
|
||||
return `Your Team plan is now active on ${brandName}`
|
||||
case 'credit-purchase':
|
||||
return `Credits added to your ${brandName} account`
|
||||
case 'onboarding-followup':
|
||||
return `Quick question about ${brandName}`
|
||||
case 'welcome':
|
||||
return `Welcome to ${brandName}`
|
||||
default:
|
||||
|
||||
@@ -957,6 +957,297 @@ describe('ExecutionEngine', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('Response block exit-point behavior', () => {
|
||||
it('should lock finalOutput and stop execution when a terminal Response block fires', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const responseNode = createMockNode('response', 'response')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'response' })
|
||||
|
||||
const dag = createMockDAG([startNode, responseNode])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['response']
|
||||
return []
|
||||
})
|
||||
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'response') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { message: 'ok' }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { message: 'ok' }, status: 200, headers: {} })
|
||||
expect(nodeOrchestrator.executionCount).toBe(2)
|
||||
})
|
||||
|
||||
it('should stop execution after Response block on a branch (Router)', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const routerNode = createMockNode('router', 'router')
|
||||
const successResponse = createMockNode('success-response', 'response')
|
||||
const errorResponse = createMockNode('error-response', 'response')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'router' })
|
||||
routerNode.outgoingEdges.set('success', { target: 'success-response' })
|
||||
routerNode.outgoingEdges.set('error', { target: 'error-response' })
|
||||
|
||||
const dag = createMockDAG([startNode, routerNode, successResponse, errorResponse])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['router']
|
||||
if (node.id === 'router') return ['success-response']
|
||||
return []
|
||||
})
|
||||
|
||||
const executedNodes: string[] = []
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
executedNodes.push(nodeId)
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'success-response') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { result: 'success' }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { result: 'success' }, status: 200, headers: {} })
|
||||
expect(executedNodes).not.toContain('error-response')
|
||||
})
|
||||
|
||||
it('should stop all branches when a parallel Response block fires first', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const responseNode = createMockNode('fast-response', 'response')
|
||||
const slowNode = createMockNode('slow-work', 'function')
|
||||
const afterSlowNode = createMockNode('after-slow', 'function')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'fast-response' })
|
||||
startNode.outgoingEdges.set('edge2', { target: 'slow-work' })
|
||||
slowNode.outgoingEdges.set('edge3', { target: 'after-slow' })
|
||||
|
||||
const dag = createMockDAG([startNode, responseNode, slowNode, afterSlowNode])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['fast-response', 'slow-work']
|
||||
if (node.id === 'slow-work') return ['after-slow']
|
||||
return []
|
||||
})
|
||||
|
||||
const executedNodes: string[] = []
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
executedNodes.push(nodeId)
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'fast-response') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { fast: true }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
if (nodeId === 'slow-work') {
|
||||
await new Promise((resolve) => setTimeout(resolve, 1))
|
||||
return { nodeId, output: { slow: true }, isFinalOutput: false }
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: true }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { fast: true }, status: 200, headers: {} })
|
||||
expect(executedNodes).not.toContain('after-slow')
|
||||
})
|
||||
|
||||
it('should use standard finalOutput logic when no Response block exists', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const endNode = createMockNode('end', 'function')
|
||||
startNode.outgoingEdges.set('edge1', { target: 'end' })
|
||||
|
||||
const dag = createMockDAG([startNode, endNode])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['end']
|
||||
return []
|
||||
})
|
||||
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'end') {
|
||||
return { nodeId, output: { result: 'done' }, isFinalOutput: true }
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ result: 'done' })
|
||||
})
|
||||
|
||||
it('should not let a second Response block overwrite the first', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const response1 = createMockNode('response1', 'response')
|
||||
const response2 = createMockNode('response2', 'response')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'response1' })
|
||||
startNode.outgoingEdges.set('edge2', { target: 'response2' })
|
||||
|
||||
const dag = createMockDAG([startNode, response1, response2])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['response1', 'response2']
|
||||
return []
|
||||
})
|
||||
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'response1') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { first: true }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
if (nodeId === 'response2') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { second: true }, status: 201, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { first: true }, status: 200, headers: {} })
|
||||
})
|
||||
|
||||
it('should not let non-Response terminals overwrite a Response block output', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const responseNode = createMockNode('response', 'response')
|
||||
const otherTerminal = createMockNode('other', 'function')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'response' })
|
||||
startNode.outgoingEdges.set('edge2', { target: 'other' })
|
||||
|
||||
const dag = createMockDAG([startNode, responseNode, otherTerminal])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['response', 'other']
|
||||
return []
|
||||
})
|
||||
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'response') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { response: true }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
if (nodeId === 'other') {
|
||||
await new Promise((resolve) => setTimeout(resolve, 1))
|
||||
return { nodeId, output: { other: true }, isFinalOutput: true }
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { response: true }, status: 200, headers: {} })
|
||||
})
|
||||
|
||||
it('should honor locked Response output even when a parallel node throws an error', async () => {
|
||||
const startNode = createMockNode('start', 'starter')
|
||||
const responseNode = createMockNode('response', 'response')
|
||||
const errorNode = createMockNode('error-node', 'function')
|
||||
|
||||
startNode.outgoingEdges.set('edge1', { target: 'response' })
|
||||
startNode.outgoingEdges.set('edge2', { target: 'error-node' })
|
||||
|
||||
const dag = createMockDAG([startNode, responseNode, errorNode])
|
||||
const context = createMockContext()
|
||||
const edgeManager = createMockEdgeManager((node) => {
|
||||
if (node.id === 'start') return ['response', 'error-node']
|
||||
return []
|
||||
})
|
||||
|
||||
const nodeOrchestrator = {
|
||||
executionCount: 0,
|
||||
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
|
||||
nodeOrchestrator.executionCount++
|
||||
if (nodeId === 'response') {
|
||||
return {
|
||||
nodeId,
|
||||
output: { data: { ok: true }, status: 200, headers: {} },
|
||||
isFinalOutput: true,
|
||||
}
|
||||
}
|
||||
if (nodeId === 'error-node') {
|
||||
await new Promise((resolve) => setTimeout(resolve, 1))
|
||||
throw new Error('Parallel branch failed')
|
||||
}
|
||||
return { nodeId, output: {}, isFinalOutput: false }
|
||||
}),
|
||||
handleNodeCompletion: vi.fn(),
|
||||
} as unknown as MockNodeOrchestrator
|
||||
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
const result = await engine.run('start')
|
||||
|
||||
expect(result.success).toBe(true)
|
||||
expect(result.output).toEqual({ data: { ok: true }, status: 200, headers: {} })
|
||||
})
|
||||
})
|
||||
|
||||
describe('Cancellation flag behavior', () => {
|
||||
it('should set cancelledFlag when abort signal fires', async () => {
|
||||
const abortController = new AbortController()
|
||||
|
||||
@@ -23,6 +23,7 @@ export class ExecutionEngine {
|
||||
private executing = new Set<Promise<void>>()
|
||||
private queueLock = Promise.resolve()
|
||||
private finalOutput: NormalizedBlockOutput = {}
|
||||
private responseOutputLocked = false
|
||||
private pausedBlocks: Map<string, PauseMetadata> = new Map()
|
||||
private allowResumeTriggers: boolean
|
||||
private cancelledFlag = false
|
||||
@@ -127,8 +128,7 @@ export class ExecutionEngine {
|
||||
await this.waitForAllExecutions()
|
||||
}
|
||||
|
||||
// Rethrow the captured error so it's handled by the catch block
|
||||
if (this.errorFlag && this.executionError) {
|
||||
if (this.errorFlag && this.executionError && !this.responseOutputLocked) {
|
||||
throw this.executionError
|
||||
}
|
||||
|
||||
@@ -399,6 +399,12 @@ export class ExecutionEngine {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.stoppedEarlyFlag && this.responseOutputLocked) {
|
||||
// Workflow already ended via Response block. Skip state persistence (setBlockOutput),
|
||||
// parallel/loop scope tracking, and edge propagation — no downstream blocks will run.
|
||||
return
|
||||
}
|
||||
|
||||
if (output._pauseMetadata) {
|
||||
const pauseMetadata = output._pauseMetadata
|
||||
this.pausedBlocks.set(pauseMetadata.contextId, pauseMetadata)
|
||||
@@ -410,7 +416,17 @@ export class ExecutionEngine {
|
||||
|
||||
await this.nodeOrchestrator.handleNodeCompletion(this.context, nodeId, output)
|
||||
|
||||
if (isFinalOutput) {
|
||||
const isResponseBlock = node.block.metadata?.id === BlockType.RESPONSE
|
||||
if (isResponseBlock) {
|
||||
if (!this.responseOutputLocked) {
|
||||
this.finalOutput = output
|
||||
this.responseOutputLocked = true
|
||||
}
|
||||
this.stoppedEarlyFlag = true
|
||||
return
|
||||
}
|
||||
|
||||
if (isFinalOutput && !this.responseOutputLocked) {
|
||||
this.finalOutput = output
|
||||
}
|
||||
|
||||
|
||||
@@ -75,6 +75,7 @@ import { processCredentialDraft } from '@/lib/credentials/draft-processor'
|
||||
import { sendEmail } from '@/lib/messaging/email/mailer'
|
||||
import { getFromEmailAddress, getPersonalEmailFrom } from '@/lib/messaging/email/utils'
|
||||
import { quickValidateEmail } from '@/lib/messaging/email/validation'
|
||||
import { scheduleLifecycleEmail } from '@/lib/messaging/lifecycle'
|
||||
import { syncAllWebhooksForCredentialSet } from '@/lib/webhooks/utils.server'
|
||||
import { SSO_TRUSTED_PROVIDERS } from '@/ee/sso/constants'
|
||||
import { createAnonymousSession, ensureAnonymousUserExists } from './anonymous'
|
||||
@@ -221,6 +222,19 @@ export const auth = betterAuth({
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
await scheduleLifecycleEmail({
|
||||
userId: user.id,
|
||||
type: 'onboarding-followup',
|
||||
delayDays: 5,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
'[databaseHooks.user.create.after] Failed to schedule onboarding followup email',
|
||||
{ userId: user.id, error }
|
||||
)
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -596,6 +610,19 @@ export const auth = betterAuth({
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
await scheduleLifecycleEmail({
|
||||
userId: user.id,
|
||||
type: 'onboarding-followup',
|
||||
delayDays: 3,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
'[emailVerification.onEmailVerification] Failed to schedule onboarding followup email',
|
||||
{ userId: user.id, error }
|
||||
)
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
48
apps/sim/lib/messaging/lifecycle.ts
Normal file
48
apps/sim/lib/messaging/lifecycle.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { tasks } from '@trigger.dev/sdk'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
|
||||
const logger = createLogger('LifecycleEmail')
|
||||
|
||||
export const LIFECYCLE_EMAIL_TASK_ID = 'lifecycle-email' as const
|
||||
|
||||
/** Supported lifecycle email types. Add new types here as the sequence grows. */
|
||||
export type LifecycleEmailType = 'onboarding-followup'
|
||||
|
||||
interface ScheduleLifecycleEmailOptions {
|
||||
userId: string
|
||||
type: LifecycleEmailType
|
||||
delayDays: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a lifecycle email to be sent after a delay.
|
||||
* Uses Trigger.dev's built-in delay scheduling — no polling or cron needed.
|
||||
*/
|
||||
export async function scheduleLifecycleEmail({
|
||||
userId,
|
||||
type,
|
||||
delayDays,
|
||||
}: ScheduleLifecycleEmailOptions): Promise<void> {
|
||||
if (!isTriggerDevEnabled || !env.TRIGGER_SECRET_KEY) {
|
||||
logger.info('[lifecycle] Trigger.dev not configured, skipping lifecycle email', {
|
||||
userId,
|
||||
type,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
const delayUntil = new Date(Date.now() + delayDays * 24 * 60 * 60 * 1000)
|
||||
|
||||
await tasks.trigger(
|
||||
LIFECYCLE_EMAIL_TASK_ID,
|
||||
{ userId, type },
|
||||
{
|
||||
delay: delayUntil,
|
||||
idempotencyKey: `lifecycle-${type}-${userId}`,
|
||||
}
|
||||
)
|
||||
|
||||
logger.info('[lifecycle] Scheduled lifecycle email', { userId, type, delayDays })
|
||||
}
|
||||
Reference in New Issue
Block a user