Compare commits

...

4 Commits

Author SHA1 Message Date
Vikhyath Mondreti
36612ae42a v0.5.111: non-polling webhook execs off trigger.dev, gmail subject headers, webhook trigger configs (#3530) 2026-03-11 17:47:28 -07:00
Vikhyath Mondreti
68d207df94 improvement(webhooks): move non-polling executions off trigger.dev (#3527)
* improvement(webhooks): move non-polling off trigger.dev

* restore constants file

* improve comment

* add unit test to prevent drift
2026-03-11 17:07:24 -07:00
Vikhyath Mondreti
d5502d602b feat(webhooks): dedup and custom ack configuration (#3525)
* feat(webhooks): dedup and custom ack configuration

* address review comments

* reject object typed idempotency key
2026-03-11 15:51:35 -07:00
Waleed
37d524bb0a fix(gmail): RFC 2047 encode subject headers for non-ASCII characters (#3526)
* fix(gmail): RFC 2047 encode subject headers for non-ASCII characters

* Fix RFC 2047 encoded word length limit

Split long email subjects into multiple RFC 2047 encoded words to comply with the 75-character limit per RFC 2047 Section 2. Each encoded word now contains at most 45 bytes of UTF-8 content (producing max 60 chars of base64 + 12 chars overhead = 72 total). Multiple encoded words are separated by CRLF + space (folding whitespace).

Applied via @cursor push command

* fix(gmail): split RFC 2047 encoded words on character boundaries

* fix(gmail): simplify RFC 2047 encoding to match Google's own sample

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
2026-03-11 15:48:07 -07:00
20 changed files with 252 additions and 26 deletions

View File

@@ -367,9 +367,7 @@ export async function POST(request: NextRequest) {
)
}
// Configure each new webhook (for providers that need configuration)
const pollingProviders = ['gmail', 'outlook']
const needsConfiguration = pollingProviders.includes(provider)
const needsConfiguration = provider === 'gmail' || provider === 'outlook'
if (needsConfiguration) {
const configureFunc =

View File

@@ -18,6 +18,7 @@ export const GenericWebhookBlock: BlockConfig = {
bestPractices: `
- You can test the webhook by sending a request to the webhook URL. E.g. depending on authorization: curl -X POST http://localhost:3000/api/webhooks/trigger/d8abcf0d-1ee5-4b77-bb07-b1e8142ea4e9 -H "Content-Type: application/json" -H "X-Sim-Secret: 1234" -d '{"message": "Test webhook trigger", "data": {"key": "v"}}'
- Continuing example above, the body can be accessed in downstream block using dot notation. E.g. <webhook1.message> and <webhook1.data.key>
- To deduplicate incoming events, set the Deduplication Field to a dot-notation path of a unique field in the payload (e.g. "event.id"). Duplicate values within 7 days will be skipped.
- Only use when there's no existing integration for the service with triggerAllowed flag set to true.
`,
subBlocks: [...getTrigger('generic_webhook').subBlocks],

View File

@@ -22,7 +22,7 @@ export class TriggerBlockHandler implements BlockHandler {
}
const existingState = ctx.blockStates.get(block.id)
if (existingState?.output && Object.keys(existingState.output).length > 0) {
if (existingState?.output) {
return existingState.output
}

View File

@@ -7,6 +7,7 @@ const logger = createLogger('AsyncJobsConfig')
let cachedBackend: JobQueueBackend | null = null
let cachedBackendType: AsyncBackendType | null = null
let cachedInlineBackend: JobQueueBackend | null = null
/**
* Determines which async backend to use based on environment configuration.
@@ -71,6 +72,31 @@ export function getCurrentBackendType(): AsyncBackendType | null {
return cachedBackendType
}
/**
* Gets a job queue backend that bypasses Trigger.dev (Redis -> Database).
* Used for non-polling webhooks that should always execute inline.
*/
export async function getInlineJobQueue(): Promise<JobQueueBackend> {
if (cachedInlineBackend) {
return cachedInlineBackend
}
const redis = getRedisClient()
let type: string
if (redis) {
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
cachedInlineBackend = new RedisJobQueue(redis)
type = 'redis'
} else {
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
cachedInlineBackend = new DatabaseJobQueue()
type = 'database'
}
logger.info(`Inline job backend initialized: ${type}`)
return cachedInlineBackend
}
/**
* Checks if jobs should be executed inline (fire-and-forget).
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
@@ -85,4 +111,5 @@ export function shouldExecuteInline(): boolean {
export function resetJobQueueCache(): void {
cachedBackend = null
cachedBackendType = null
cachedInlineBackend = null
}

View File

@@ -1,6 +1,7 @@
export {
getAsyncBackendType,
getCurrentBackendType,
getInlineJobQueue,
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,

View File

@@ -413,6 +413,7 @@ export class IdempotencyService {
: undefined
const webhookIdHeader =
normalizedHeaders?.['x-sim-idempotency-key'] ||
normalizedHeaders?.['webhook-id'] ||
normalizedHeaders?.['x-webhook-id'] ||
normalizedHeaders?.['x-shopify-webhook-id'] ||

View File

@@ -5,7 +5,7 @@ import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { isProd } from '@/lib/core/config/feature-flags'
import { safeCompare } from '@/lib/core/security/encryption'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
@@ -29,6 +29,7 @@ import {
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
import { isPollingWebhookProvider } from '@/triggers/constants'
import { isGitHubEventMatch } from '@/triggers/github/utils'
import { isHubSpotContactEventMatch } from '@/triggers/hubspot/utils'
import { isJiraEventMatch } from '@/triggers/jira/utils'
@@ -1049,7 +1050,7 @@ export async function queueWebhookExecution(
}
}
const headers = Object.fromEntries(request.headers.entries())
const { 'x-sim-idempotency-key': _, ...headers } = Object.fromEntries(request.headers.entries())
// For Microsoft Teams Graph notifications, extract unique identifiers for idempotency
if (
@@ -1067,9 +1068,20 @@ export async function queueWebhookExecution(
}
}
// Extract credentialId from webhook config
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
if (foundWebhook.provider === 'generic') {
const idempotencyField = providerConfig.idempotencyField as string | undefined
if (idempotencyField && body) {
const value = idempotencyField
.split('.')
.reduce((acc: any, key: string) => acc?.[key], body)
if (value !== undefined && value !== null && typeof value !== 'object') {
headers['x-sim-idempotency-key'] = String(value)
}
}
}
const credentialId = providerConfig.credentialId as string | undefined
// credentialSetId is a direct field on webhook table, not in providerConfig
@@ -1105,15 +1117,24 @@ export async function queueWebhookExecution(
...(credentialId ? { credentialId } : {}),
}
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook`
)
const isPolling = isPollingWebhookProvider(payload.provider)
if (shouldExecuteInline()) {
if (isPolling && !shouldExecuteInline()) {
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue`
)
} else {
const jobQueue = await getInlineJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: actorUserId },
})
logger.info(
`[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline`
)
void (async () => {
try {
await jobQueue.startJob(jobId)
@@ -1193,6 +1214,26 @@ export async function queueWebhookExecution(
})
}
if (foundWebhook.provider === 'generic' && providerConfig.responseMode === 'custom') {
const rawCode = Number(providerConfig.responseStatusCode) || 200
const statusCode = rawCode >= 100 && rawCode <= 599 ? rawCode : 200
const responseBody = (providerConfig.responseBody as string | undefined)?.trim()
if (!responseBody) {
return new NextResponse(null, { status: statusCode })
}
try {
const parsed = JSON.parse(responseBody)
return NextResponse.json(parsed, { status: statusCode })
} catch {
return new NextResponse(responseBody, {
status: statusCode,
headers: { 'Content-Type': 'text/plain' },
})
}
}
return NextResponse.json({ message: 'Webhook processed' })
} catch (error: any) {
logger.error(`[${options.requestId}] Failed to queue webhook execution:`, error)

View File

@@ -19,6 +19,7 @@ import {
refreshAccessTokenIfNeeded,
resolveOAuthAccountId,
} from '@/app/api/auth/oauth/utils'
import { isPollingWebhookProvider } from '@/triggers/constants'
const logger = createLogger('WebhookUtils')
@@ -2222,10 +2223,7 @@ export async function syncWebhooksForCredentialSet(params: {
`[${requestId}] Syncing webhooks for credential set ${credentialSetId}, provider ${provider}`
)
// Polling providers get unique paths per credential (for independent state)
// External webhook providers share the same path (external service sends to one URL)
const pollingProviders = ['gmail', 'outlook', 'rss', 'imap']
const useUniquePaths = pollingProviders.includes(provider)
const useUniquePaths = isPollingWebhookProvider(provider)
const credentials = await getCredentialsForCredentialSet(credentialSetId, oauthProviderId)

View File

@@ -433,7 +433,7 @@ describe('hasWorkflowChanged', () => {
expect(hasWorkflowChanged(state1, state2)).toBe(true)
})
it.concurrent('should detect subBlock type changes', () => {
it.concurrent('should ignore subBlock type changes', () => {
const state1 = createWorkflowState({
blocks: {
block1: createBlock('block1', {
@@ -448,7 +448,7 @@ describe('hasWorkflowChanged', () => {
}),
},
})
expect(hasWorkflowChanged(state1, state2)).toBe(true)
expect(hasWorkflowChanged(state1, state2)).toBe(false)
})
it.concurrent('should handle null/undefined subBlock values consistently', () => {

View File

@@ -496,7 +496,14 @@ export function normalizeSubBlockValue(subBlockId: string, value: unknown): unkn
* @returns SubBlock fields excluding value and is_diff
*/
export function extractSubBlockRest(subBlock: Record<string, unknown>): Record<string, unknown> {
const { value: _v, is_diff: _sd, ...rest } = subBlock as SubBlockWithDiffMarker
const {
value: _v,
is_diff: _sd,
type: _type,
...rest
} = subBlock as SubBlockWithDiffMarker & {
type?: unknown
}
return rest
}

View File

@@ -0,0 +1,36 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { encodeRfc2047 } from './utils'
describe('encodeRfc2047', () => {
it('returns ASCII text unchanged', () => {
expect(encodeRfc2047('Simple ASCII Subject')).toBe('Simple ASCII Subject')
})
it('returns empty string unchanged', () => {
expect(encodeRfc2047('')).toBe('')
})
it('encodes emojis as RFC 2047 base64', () => {
const result = encodeRfc2047('Time to Stretch! 🧘')
expect(result).toBe('=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?=')
})
it('round-trips non-ASCII subjects correctly', () => {
const subjects = ['Hello 世界', 'Café résumé', '🎉🎊🎈 Party!', '今週のミーティング']
for (const subject of subjects) {
const encoded = encodeRfc2047(subject)
const match = encoded.match(/^=\?UTF-8\?B\?(.+)\?=$/)
expect(match).not.toBeNull()
const decoded = Buffer.from(match![1], 'base64').toString('utf-8')
expect(decoded).toBe(subject)
}
})
it('does not double-encode already-encoded subjects', () => {
const alreadyEncoded = '=?UTF-8?B?VGltZSB0byBTdHJldGNoISDwn6eY?='
expect(encodeRfc2047(alreadyEncoded)).toBe(alreadyEncoded)
})
})

View File

@@ -294,6 +294,19 @@ function generateBoundary(): string {
return `----=_Part_${Date.now()}_${Math.random().toString(36).substring(2, 15)}`
}
/**
* Encode a header value using RFC 2047 Base64 encoding if it contains non-ASCII characters.
* This matches Google's own Gmail API sample: `=?utf-8?B?${Buffer.from(subject).toString('base64')}?=`
* @see https://github.com/googleapis/google-api-nodejs-client/blob/main/samples/gmail/send.js
*/
export function encodeRfc2047(value: string): string {
// eslint-disable-next-line no-control-regex
if (/^[\x00-\x7F]*$/.test(value)) {
return value
}
return `=?UTF-8?B?${Buffer.from(value, 'utf-8').toString('base64')}?=`
}
/**
* Encode string or buffer to base64url format (URL-safe base64)
* Gmail API requires base64url encoding for the raw message field
@@ -333,7 +346,7 @@ export function buildSimpleEmailMessage(params: {
emailHeaders.push(`Bcc: ${bcc}`)
}
emailHeaders.push(`Subject: ${subject || ''}`)
emailHeaders.push(`Subject: ${encodeRfc2047(subject || '')}`)
if (inReplyTo) {
emailHeaders.push(`In-Reply-To: ${inReplyTo}`)
@@ -380,7 +393,7 @@ export function buildMimeMessage(params: BuildMimeMessageParams): string {
if (bcc) {
messageParts.push(`Bcc: ${bcc}`)
}
messageParts.push(`Subject: ${subject || ''}`)
messageParts.push(`Subject: ${encodeRfc2047(subject || '')}`)
if (inReplyTo) {
messageParts.push(`In-Reply-To: ${inReplyTo}`)

View File

@@ -0,0 +1,41 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { POLLING_PROVIDERS } from '@/triggers/constants'
import { TRIGGER_REGISTRY } from '@/triggers/registry'
describe('POLLING_PROVIDERS sync with TriggerConfig.polling', () => {
it('matches every trigger with polling: true in the registry', () => {
const registryPollingProviders = new Set(
Object.values(TRIGGER_REGISTRY)
.filter((t) => t.polling === true)
.map((t) => t.provider)
)
expect(POLLING_PROVIDERS).toEqual(registryPollingProviders)
})
it('no trigger with polling: true is missing from POLLING_PROVIDERS', () => {
const missing: string[] = []
for (const trigger of Object.values(TRIGGER_REGISTRY)) {
if (trigger.polling && !POLLING_PROVIDERS.has(trigger.provider)) {
missing.push(`${trigger.id} (provider: ${trigger.provider})`)
}
}
expect(missing, `Triggers with polling: true missing from POLLING_PROVIDERS`).toEqual([])
})
it('no POLLING_PROVIDERS entry lacks a polling: true trigger in the registry', () => {
const extra: string[] = []
for (const provider of POLLING_PROVIDERS) {
const hasTrigger = Object.values(TRIGGER_REGISTRY).some(
(t) => t.provider === provider && t.polling === true
)
if (!hasTrigger) {
extra.push(provider)
}
}
expect(extra, `POLLING_PROVIDERS entries with no matching polling trigger`).toEqual([])
})
})

View File

@@ -35,3 +35,15 @@ export const TRIGGER_RUNTIME_SUBBLOCK_IDS: string[] = [
* This prevents runaway errors from continuously executing failing workflows.
*/
export const MAX_CONSECUTIVE_FAILURES = 100
/**
* Set of webhook provider names that use polling-based triggers.
* Mirrors the `polling: true` flag on TriggerConfig entries.
* Used to route execution: polling providers use the full job queue
* (Trigger.dev), non-polling providers execute inline.
*/
export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap'])
export function isPollingWebhookProvider(provider: string): boolean {
return POLLING_PROVIDERS.has(provider)
}

View File

@@ -49,6 +49,49 @@ export const genericWebhookTrigger: TriggerConfig = {
required: false,
mode: 'trigger',
},
{
id: 'idempotencyField',
title: 'Deduplication Field (Optional)',
type: 'short-input',
placeholder: 'e.g. event.id',
description:
'Dot-notation path to a unique field in the payload for deduplication. If the same value is seen within 7 days, the duplicate webhook will be skipped.',
required: false,
mode: 'trigger',
},
{
id: 'responseMode',
title: 'Acknowledgement',
type: 'dropdown',
options: [
{ label: 'Default', id: 'default' },
{ label: 'Custom', id: 'custom' },
],
defaultValue: 'default',
mode: 'trigger',
},
{
id: 'responseStatusCode',
title: 'Response Status Code',
type: 'short-input',
placeholder: '200 (default)',
description:
'HTTP status code (100599) to return to the webhook caller. Defaults to 200 if empty or invalid.',
required: false,
mode: 'trigger',
condition: { field: 'responseMode', value: 'custom' },
},
{
id: 'responseBody',
title: 'Response Body',
type: 'code',
language: 'json',
placeholder: '{"ok": true}',
description: 'JSON body to return to the webhook caller. Leave empty for no body.',
required: false,
mode: 'trigger',
condition: { field: 'responseMode', value: 'custom' },
},
{
id: 'inputFormat',
title: 'Input Format',
@@ -76,7 +119,7 @@ export const genericWebhookTrigger: TriggerConfig = {
'The webhook will receive any HTTP method (GET, POST, PUT, DELETE, etc.).',
'All request data (headers, body, query parameters) will be available in your workflow.',
'If authentication is enabled, include the token in requests using either the custom header or "Authorization: Bearer TOKEN".',
'Common fields like "event", "id", and "data" will be automatically extracted from the payload when available.',
'To deduplicate incoming events, set the Deduplication Field to the dot-notation path of a unique identifier in the payload (e.g. "event.id"). Duplicate values within 7 days will be skipped.',
]
.map(
(instruction, index) =>

View File

@@ -30,6 +30,7 @@ export const gmailPollingTrigger: TriggerConfig = {
description: 'Triggers when new emails are received in Gmail (requires Gmail credentials)',
version: '1.0.0',
icon: GmailIcon,
polling: true,
subBlocks: [
{

View File

@@ -12,6 +12,7 @@ export const imapPollingTrigger: TriggerConfig = {
description: 'Triggers when new emails are received via IMAP (works with any email provider)',
version: '1.0.0',
icon: MailServerIcon,
polling: true,
subBlocks: [
// Connection settings

View File

@@ -24,6 +24,7 @@ export const outlookPollingTrigger: TriggerConfig = {
description: 'Triggers when new emails are received in Outlook (requires Microsoft credentials)',
version: '1.0.0',
icon: OutlookIcon,
polling: true,
subBlocks: [
{

View File

@@ -8,6 +8,7 @@ export const rssPollingTrigger: TriggerConfig = {
description: 'Triggers when new items are published to an RSS feed',
version: '1.0.0',
icon: RssIcon,
polling: true,
subBlocks: [
{

View File

@@ -25,6 +25,9 @@ export interface TriggerConfig {
method?: 'POST' | 'GET' | 'PUT' | 'DELETE'
headers?: Record<string, string>
}
/** When true, this trigger is poll-based (cron-driven) rather than push-based. */
polling?: boolean
}
export interface TriggerRegistry {