feat(gmail): added gmail polling service to trigger workflow on incoming emails (#344)

* setup gmail polling service, not tested

* general improvements to gmail polling and error handling, receive message but triggers wrong wrokflow

* finished gmail polling service, works when I send multiple emails in a single polling period (triggers the workflow for each new email)

* remove unread messages

* remove unread messages

* modified to process all incoming emails as individual workflow executions, enhance dedupe, general improvements

* replaced desc w tooltips

* added cron job for polling gmail

* remove unused props, simplify naming convention

* renoved extraneous comments, removed unused processIncomingEmails

* fixed build issues

* acknowledged PR comments
This commit is contained in:
Waleed Latif
2025-05-11 12:46:09 -07:00
committed by GitHub
parent d79cad4c52
commit 0fc0f683a6
38 changed files with 1682 additions and 156 deletions

View File

@@ -1 +1 @@
cd sim && npx lint-staged
cd apps/sim && npx lint-staged

View File

@@ -8,6 +8,14 @@ import { refreshAccessTokenIfNeeded } from '../../utils'
const logger = createLogger('GmailLabelsAPI')
interface GmailLabel {
id: string
name: string
type: 'system' | 'user'
messagesTotal?: number
messagesUnread?: number
}
export async function GET(request: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
@@ -57,7 +65,6 @@ export async function GET(request: NextRequest) {
}
// Fetch labels from Gmail API
logger.info(`[${requestId}] Fetching labels from Gmail API`)
const response = await fetch('https://gmail.googleapis.com/gmail/v1/users/me/labels', {
headers: {
Authorization: `Bearer ${accessToken}`,
@@ -80,12 +87,13 @@ export async function GET(request: NextRequest) {
}
const data = await response.json()
// Log the number of labels received
logger.info(`[${requestId}] Received ${data.labels?.length || 0} labels from Gmail API`)
if (!Array.isArray(data.labels)) {
logger.error(`[${requestId}] Unexpected labels response structure:`, data)
return NextResponse.json({ error: 'Invalid labels response' }, { status: 500 })
}
// Transform the labels to a more usable format
const labels = data.labels.map((label: any) => {
const labels = data.labels.map((label: GmailLabel) => {
// Format the label name with proper capitalization
let formattedName = label.name
@@ -106,7 +114,7 @@ export async function GET(request: NextRequest) {
// Filter labels if a query is provided
const filteredLabels = query
? labels.filter((label: any) =>
? labels.filter((label: GmailLabel) =>
label.name.toLowerCase().includes((query as string).toLowerCase())
)
: labels

View File

@@ -66,17 +66,20 @@ export async function POST(req: NextRequest) {
const images: { filename: string; content: Buffer; contentType: string }[] = []
for (const [key, value] of formData.entries()) {
if (key.startsWith('image_') && value instanceof Blob) {
const file = value as File
const buffer = Buffer.from(await file.arrayBuffer())
if (key.startsWith('image_') && typeof value !== 'string') {
if (value && 'arrayBuffer' in value) {
const blob = value as unknown as Blob
const buffer = Buffer.from(await blob.arrayBuffer())
const filename = 'name' in value ? (value as any).name : `image_${key.split('_')[1]}`
images.push({
filename: file.name,
filename,
content: buffer,
contentType: file.type,
contentType: 'type' in value ? (value as any).type : 'application/octet-stream',
})
}
}
}
logger.debug(`[${requestId}] Help request includes ${images.length} images`)

View File

@@ -0,0 +1,100 @@
import { NextRequest, NextResponse } from 'next/server'
import { nanoid } from 'nanoid'
import { Logger } from '@/lib/logs/console-logger'
import { pollGmailWebhooks } from '@/lib/webhooks/gmail-polling-service'
const logger = new Logger('GmailPollingAPI')
export const dynamic = 'force-dynamic'
export const maxDuration = 300 // Allow up to 5 minutes for polling to complete
interface PollingTask {
promise: Promise<any>
startedAt: number
}
const activePollingTasks = new Map<string, PollingTask>()
const STALE_TASK_THRESHOLD_MS = 10 * 60 * 1000 // 10 minutes
function cleanupStaleTasks() {
const now = Date.now()
let removedCount = 0
for (const [requestId, task] of activePollingTasks.entries()) {
if (now - task.startedAt > STALE_TASK_THRESHOLD_MS) {
activePollingTasks.delete(requestId)
removedCount++
}
}
if (removedCount > 0) {
logger.info(`Cleaned up ${removedCount} stale polling tasks`)
}
return removedCount
}
export async function GET(request: NextRequest) {
const requestId = nanoid()
logger.info(`Gmail webhook polling triggered (${requestId})`)
try {
const authHeader = request.headers.get('authorization')
const webhookSecret = process.env.WEBHOOK_POLLING_SECRET
if (!webhookSecret) {
logger.warn(`WEBHOOK_POLLING_SECRET is not set`)
return new NextResponse('Configuration error: Webhook secret is not set', { status: 500 })
}
if (!authHeader || authHeader !== `Bearer ${webhookSecret}`) {
logger.warn(`Unauthorized access attempt to Gmail polling endpoint (${requestId})`)
return new NextResponse('Unauthorized', { status: 401 })
}
cleanupStaleTasks()
const pollingTask: PollingTask = {
promise: null as any,
startedAt: Date.now(),
}
pollingTask.promise = pollGmailWebhooks()
.then((results) => {
logger.info(`Gmail polling completed successfully (${requestId})`, {
userCount: results?.total || 0,
successful: results?.successful || 0,
failed: results?.failed || 0,
})
activePollingTasks.delete(requestId)
return results
})
.catch((error) => {
logger.error(`Error in background Gmail polling task (${requestId}):`, error)
activePollingTasks.delete(requestId)
throw error
})
activePollingTasks.set(requestId, pollingTask)
return NextResponse.json({
success: true,
message: 'Gmail webhook polling started successfully',
requestId,
status: 'polling_started',
activeTasksCount: activePollingTasks.size,
})
} catch (error) {
logger.error(`Error initiating Gmail webhook polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'Failed to start Gmail webhook polling',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
}
}

View File

@@ -182,6 +182,40 @@ export async function POST(request: NextRequest) {
}
// --- End Telegram specific logic ---
// --- Gmail webhook setup ---
if (savedWebhook && provider === 'gmail') {
logger.info(
`[${requestId}] Gmail provider detected. Setting up Gmail webhook configuration.`
)
try {
const { configureGmailPolling } = await import('@/lib/webhooks/utils')
const success = await configureGmailPolling(userId, savedWebhook, requestId)
if (!success) {
logger.error(`[${requestId}] Failed to configure Gmail polling`)
return NextResponse.json(
{
error: 'Failed to configure Gmail polling',
details: 'Please check your Gmail account permissions and try again',
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Successfully configured Gmail polling`)
} catch (err) {
logger.error(`[${requestId}] Error setting up Gmail webhook configuration`, err)
return NextResponse.json(
{
error: 'Failed to configure Gmail webhook',
details: err instanceof Error ? err.message : 'Unknown error',
},
{ status: 500 }
)
}
}
// --- End Gmail specific logic ---
const status = existingWebhooks.length > 0 ? 200 : 201
return NextResponse.json({ webhook: savedWebhook }, { status })
} catch (error: any) {

View File

@@ -191,6 +191,7 @@ export async function POST(
// Detect provider type
const isAirtableWebhook = foundWebhook.provider === 'airtable'
const isGmailWebhook = foundWebhook.provider === 'gmail'
// Handle Slack challenge verification (must be done before timeout)
const slackChallengeResponse =
@@ -283,6 +284,49 @@ export async function POST(
}
}
// For Gmail: Process with specific email handling
if (isGmailWebhook) {
try {
logger.info(`[${requestId}] Gmail webhook request received for webhook: ${foundWebhook.id}`)
const webhookSecret = foundWebhook.secret
if (webhookSecret) {
const secretHeader = request.headers.get('X-Webhook-Secret')
if (secretHeader !== webhookSecret) {
logger.warn(`[${requestId}] Invalid webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
if (body.email) {
logger.info(`[${requestId}] Processing Gmail email`, {
emailId: body.email.id,
subject:
body.email?.payload?.headers?.find((h: any) => h.name === 'Subject')?.value ||
'No subject',
})
const executionId = uuidv4()
logger.info(`[${requestId}] Executing workflow ${foundWorkflow.id} for Gmail email`)
return await processWebhook(
foundWebhook,
foundWorkflow,
body,
request,
executionId,
requestId
)
} else {
logger.warn(`[${requestId}] Invalid Gmail webhook payload format`)
return new NextResponse('Invalid payload format', { status: 400 })
}
} catch (error: any) {
logger.error(`[${requestId}] Error processing Gmail webhook`, error)
return new NextResponse(`Internal server error: ${error.message}`, { status: 500 })
}
}
// --- For all other webhook types: Use async processing with timeout ---
// Create timeout promise for fast initial response (2.5 seconds)

View File

@@ -36,6 +36,7 @@ export interface OAuthRequiredModalProps {
const SCOPE_DESCRIPTIONS: Record<string, string> = {
'https://www.googleapis.com/auth/gmail.send': 'Send emails on your behalf',
'https://www.googleapis.com/auth/gmail.labels': 'View and manage your email labels',
'https://www.googleapis.com/auth/gmail.modify': 'View and manage your email messages',
// 'https://www.googleapis.com/auth/gmail.readonly': 'View and read your email messages',
// 'https://www.googleapis.com/auth/drive': 'View and manage your Google Drive files',
'https://www.googleapis.com/auth/drive.file': 'View and manage your Google Drive files',

View File

@@ -1,8 +1,11 @@
import React from 'react'
import { Info } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Input } from '@/components/ui/input'
import { Label } from '@/components/ui/label'
import { Skeleton } from '@/components/ui/skeleton'
import { Switch } from '@/components/ui/switch'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
import { ConfigField } from '../ui/config-field'
import { ConfigSection } from '../ui/config-section'
import { InstructionsSection } from '../ui/instructions-section'
@@ -92,13 +95,32 @@ export function AirtableConfig({
</ConfigField>
<div className="flex items-center justify-between rounded-lg border border-border p-3 shadow-sm bg-background">
<div className="space-y-0.5 pr-4">
<div className="flex items-center gap-2">
<Label htmlFor="include-cell-values" className="font-normal">
Include Full Record Data
</Label>
<p className="text-xs text-muted-foreground">
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label="Learn more about including full record data"
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">
Enable to receive the complete record data in the payload, not just changes.
</p>
</TooltipContent>
</Tooltip>
</div>
{isLoadingToken ? (
<Skeleton className="h-5 w-9" />

View File

@@ -0,0 +1,304 @@
import { useEffect, useState } from 'react'
import { Info } from 'lucide-react'
import { GmailIcon } from '@/components/icons'
import { Badge } from '@/components/ui/badge'
import { Button } from '@/components/ui/button'
import { Checkbox } from '@/components/ui/checkbox'
import { Label } from '@/components/ui/label'
import { Notice } from '@/components/ui/notice'
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from '@/components/ui/select'
import { Skeleton } from '@/components/ui/skeleton'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
import { Logger } from '@/lib/logs/console-logger'
import { JSONView } from '@/app/w/[id]/components/panel/components/console/components/json-view/json-view'
import { ConfigSection } from '../ui/config-section'
const logger = new Logger('GmailConfig')
const TOOLTIPS = {
labels: 'Select which email labels to monitor.',
labelFilter: 'Choose whether to include or exclude the selected labels.',
markAsRead: 'Emails will be marked as read after being processed by your workflow.',
}
const FALLBACK_GMAIL_LABELS = [
{ id: 'INBOX', name: 'Inbox' },
{ id: 'SENT', name: 'Sent' },
{ id: 'IMPORTANT', name: 'Important' },
{ id: 'TRASH', name: 'Trash' },
{ id: 'SPAM', name: 'Spam' },
{ id: 'STARRED', name: 'Starred' },
]
interface GmailLabel {
id: string
name: string
type?: string
messagesTotal?: number
messagesUnread?: number
}
const formatLabelName = (label: GmailLabel): string => {
let formattedName = label.name.replace(/0$/, '')
if (formattedName.startsWith('Category_')) {
return formattedName
.replace('Category_', '')
.replace(/_/g, ' ')
.replace(/\b\w/g, (c) => c.toUpperCase())
}
return formattedName
}
const exampleEmailEvent = {
email: {
id: '18e0ffabd5b5a0f4',
threadId: '18e0ffabd5b5a0f4',
subject: 'Monthly Report - April 2025',
from: 'sender@example.com',
to: 'recipient@example.com',
cc: 'team@example.com',
date: '2025-05-10T10:15:23.000Z',
bodyText:
'Hello,\n\nPlease find attached the monthly report for April 2025.\n\nBest regards,\nSender',
bodyHtml:
'<div><p>Hello,</p><p>Please find attached the monthly report for April 2025.</p><p>Best regards,<br>Sender</p></div>',
snippet: 'Hello, Please find attached the monthly report for April 2025...',
labels: ['INBOX', 'IMPORTANT'],
hasAttachments: true,
attachments: [
{
filename: 'report-april-2025.pdf',
mimeType: 'application/pdf',
size: 2048576,
},
],
},
timestamp: '2025-05-10T10:15:30.123Z',
}
interface GmailConfigProps {
selectedLabels: string[]
setSelectedLabels: (labels: string[]) => void
labelFilterBehavior: 'INCLUDE' | 'EXCLUDE'
setLabelFilterBehavior: (behavior: 'INCLUDE' | 'EXCLUDE') => void
markAsRead?: boolean
setMarkAsRead?: (markAsRead: boolean) => void
}
export function GmailConfig({
selectedLabels,
setSelectedLabels,
labelFilterBehavior,
setLabelFilterBehavior,
markAsRead = false,
setMarkAsRead = () => {},
}: GmailConfigProps) {
const [labels, setLabels] = useState<GmailLabel[]>([])
const [isLoadingLabels, setIsLoadingLabels] = useState(false)
const [labelError, setLabelError] = useState<string | null>(null)
// Fetch Gmail labels
useEffect(() => {
let mounted = true
const fetchLabels = async () => {
setIsLoadingLabels(true)
setLabelError(null)
try {
const credentialsResponse = await fetch('/api/auth/oauth/credentials?provider=google-email')
if (!credentialsResponse.ok) {
throw new Error('Failed to get Google credentials')
}
const credentialsData = await credentialsResponse.json()
if (!credentialsData.credentials || !credentialsData.credentials.length) {
throw new Error('No Google credentials found')
}
const credentialId = credentialsData.credentials[0].id
const response = await fetch(`/api/auth/oauth/gmail/labels?credentialId=${credentialId}`)
if (!response.ok) {
throw new Error('Failed to fetch Gmail labels')
}
const data = await response.json()
if (data.labels && Array.isArray(data.labels)) {
if (mounted) setLabels(data.labels)
} else {
throw new Error('Invalid labels data format')
}
} catch (error) {
logger.error('Error fetching Gmail labels:', error)
if (mounted) {
setLabelError('Could not fetch Gmail labels. Using default labels instead.')
setLabels(FALLBACK_GMAIL_LABELS)
}
} finally {
if (mounted) setIsLoadingLabels(false)
}
}
fetchLabels()
return () => {
mounted = false
}
}, [])
const toggleLabel = (labelId: string) => {
if (selectedLabels.includes(labelId)) {
setSelectedLabels(selectedLabels.filter((id) => id !== labelId))
} else {
setSelectedLabels([...selectedLabels, labelId])
}
}
return (
<div className="space-y-6">
<ConfigSection>
<div className="flex items-center gap-2 mb-3">
<h3 className="text-sm font-medium">Email Labels to Monitor</h3>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label="Learn more about email labels"
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">{TOOLTIPS.labels}</p>
</TooltipContent>
</Tooltip>
</div>
{isLoadingLabels ? (
<div className="flex flex-wrap gap-2 py-2">
{Array(5)
.fill(0)
.map((_, i) => (
<Skeleton key={i} className="h-6 w-16 rounded-full" />
))}
</div>
) : (
<>
{labelError && (
<p className="text-sm text-amber-500 dark:text-amber-400">{labelError}</p>
)}
<div className="flex flex-wrap gap-2 mt-2">
{labels.map((label) => (
<Badge
key={label.id}
variant={selectedLabels.includes(label.id) ? 'default' : 'outline'}
className="cursor-pointer"
onClick={() => toggleLabel(label.id)}
>
{formatLabelName(label)}
</Badge>
))}
</div>
</>
)}
<div className="mt-4">
<div className="flex items-center gap-2">
<Label htmlFor="label-behavior" className="text-sm font-medium">
Label Filter Behavior
</Label>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label="Learn more about label filter behavior"
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">{TOOLTIPS.labelFilter}</p>
</TooltipContent>
</Tooltip>
</div>
<div className="mt-1">
<Select value={labelFilterBehavior} onValueChange={setLabelFilterBehavior}>
<SelectTrigger id="label-behavior" className="w-full">
<SelectValue placeholder="Select behavior" />
</SelectTrigger>
<SelectContent>
<SelectItem value="INCLUDE">Include selected labels</SelectItem>
<SelectItem value="EXCLUDE">Exclude selected labels</SelectItem>
</SelectContent>
</Select>
</div>
</div>
</ConfigSection>
<ConfigSection>
<h3 className="text-sm font-medium mb-3">Email Processing Options</h3>
<div className="space-y-3">
<div className="flex items-center">
<div className="flex items-center gap-2 flex-1">
<Checkbox
id="mark-as-read"
checked={markAsRead}
onCheckedChange={(checked) => setMarkAsRead(checked as boolean)}
/>
<Label htmlFor="mark-as-read" className="text-sm font-normal cursor-pointer">
Mark emails as read after processing
</Label>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label="Learn more about marking emails as read"
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent side="top" align="center" className="max-w-[300px] p-3 z-[100]">
<p className="text-sm">{TOOLTIPS.markAsRead}</p>
</TooltipContent>
</Tooltip>
</div>
</div>
</div>
</ConfigSection>
<Notice
variant="default"
className="bg-white border-slate-200 dark:bg-background dark:border-border"
icon={<GmailIcon className="h-5 w-5 text-red-500 mt-0.5 mr-3.5 flex-shrink-0" />}
title="Gmail Event Payload Example"
>
<div className="mt-2 text-sm font-mono break-normal whitespace-normal overflow-wrap-anywhere">
<JSONView data={exampleEmailEvent} />
</div>
</Notice>
</div>
)
}

View File

@@ -149,7 +149,7 @@ export function SlackConfig({
>
Your workflow will receive a payload similar to this when a subscribed event occurs:
<div className="mt-2 text-sm font-mono break-normal whitespace-normal overflow-wrap-anywhere">
<JSONView data={JSON.parse(exampleEvent)} initiallyExpanded={true} />
<JSONView data={JSON.parse(exampleEvent)} />
</div>
</Notice>
</div>

View File

@@ -1,5 +1,8 @@
import React from 'react'
import { Info } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Label } from '@/components/ui/label'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
interface ConfigFieldProps {
id: string
@@ -12,9 +15,32 @@ interface ConfigFieldProps {
export function ConfigField({ id, label, description, children, className }: ConfigFieldProps) {
return (
<div className={`space-y-2 ${className || ''}`}>
<div className="flex items-center gap-2">
<Label htmlFor={id}>{label}</Label>
{description && (
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label={`Learn more about ${label?.toString() || id}`}
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">{description}</p>
</TooltipContent>
</Tooltip>
)}
</div>
{children} {/* The actual input/select/checkbox goes here */}
{description && <p className="text-xs text-muted-foreground">{description}</p>}
</div>
)
}

View File

@@ -1,8 +1,9 @@
import { useState } from 'react'
import { Check, Copy, Eye, EyeOff } from 'lucide-react'
import { Check, Copy, Eye, EyeOff, Info } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Input } from '@/components/ui/input'
import { Label } from '@/components/ui/label'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
import { cn } from '@/lib/utils'
interface WebhookConfigFieldProps {
@@ -46,9 +47,33 @@ export function WebhookConfigField({
return (
<div className={cn('space-y-1 mb-4', className)}>
<div className="flex items-center gap-2">
<Label htmlFor={id} className="text-sm font-medium">
{label}
</Label>
{description && (
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label={`Learn more about ${label}`}
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">{description}</p>
</TooltipContent>
</Tooltip>
)}
</div>
<div className="flex">
<div className={cn('flex-1 relative')}>
<Input
@@ -99,7 +124,6 @@ export function WebhookConfigField({
)}
</Button>
</div>
{description && <p className="text-xs text-muted-foreground mt-1">{description}</p>}
</div>
)
}

View File

@@ -18,6 +18,7 @@ interface WebhookDialogFooterProps {
export function WebhookDialogFooter({
webhookId,
webhookProvider,
isSaving,
isDeleting,
isLoadingToken,
@@ -45,12 +46,12 @@ export function WebhookDialogFooter({
) : (
<Trash2 className="h-4 w-4 mr-2" />
)}
{isDeleting ? 'Deleting...' : 'Delete Webhook'}
{isDeleting ? 'Deleting...' : 'Delete'}
</Button>
)}
</div>
<div className="flex gap-2">
{webhookId && (
{webhookId && webhookProvider !== 'gmail' && (
<Button
type="button"
variant="outline"

View File

@@ -1,8 +1,8 @@
import { useState } from 'react'
import { CheckCheck, Copy } from 'lucide-react'
import { CheckCheck, Copy, Info } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Input } from '@/components/ui/input'
import { Label } from '@/components/ui/label'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
import { cn } from '@/lib/utils'
interface WebhookUrlFieldProps {
@@ -20,9 +20,31 @@ export function WebhookUrlField({
}: WebhookUrlFieldProps) {
return (
<div className="space-y-1 mb-4">
<div className="flex items-center gap-2">
<Label htmlFor="webhook-url" className="text-sm font-medium">
Webhook URL
</Label>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="sm"
className="text-gray-500 p-1 h-6 w-6"
aria-label="Learn more about webhook URL"
>
<Info className="h-4 w-4" />
</Button>
</TooltipTrigger>
<TooltipContent
side="right"
align="center"
className="max-w-[300px] p-3 z-[100]"
role="tooltip"
>
<p className="text-sm">URL that will receive webhook requests</p>
</TooltipContent>
</Tooltip>
</div>
<div className="flex">
<Input
id="webhook-url"
@@ -50,9 +72,6 @@ export function WebhookUrlField({
)}
</Button>
</div>
<p className="text-xs text-muted-foreground mt-1">
This is the URL that will receive webhook requests
</p>
</div>
)
}

View File

@@ -9,16 +9,16 @@ import {
DialogTitle,
} from '@/components/ui/dialog'
import { createLogger } from '@/lib/logs/console-logger'
import { cn } from '@/lib/utils'
import { ProviderConfig, WEBHOOK_PROVIDERS } from '../webhook-config'
import { AirtableConfig } from './providers/airtable-config'
import { DiscordConfig } from './providers/discord-config'
import { GenericConfig } from './providers/generic-config'
import { GithubConfig } from './providers/github-config'
import { SlackConfig } from './providers/slack-config'
import { StripeConfig } from './providers/stripe-config'
import { TelegramConfig } from './providers/telegram-config'
import { WhatsAppConfig } from './providers/whatsapp-config'
import { ProviderConfig, WEBHOOK_PROVIDERS } from '../webhook'
import { AirtableConfig } from './providers/airtable'
import { DiscordConfig } from './providers/discord'
import { GenericConfig } from './providers/generic'
import { GithubConfig } from './providers/github'
import { GmailConfig } from './providers/gmail'
import { SlackConfig } from './providers/slack'
import { StripeConfig } from './providers/stripe'
import { TelegramConfig } from './providers/telegram'
import { WhatsAppConfig } from './providers/whatsapp'
import { DeleteConfirmDialog } from './ui/confirmation'
import { UnsavedChangesDialog } from './ui/confirmation'
import { WebhookDialogFooter } from './ui/webhook-footer'
@@ -87,8 +87,11 @@ export function WebhookModal({
const [airtableTableId, setAirtableTableId] = useState('')
const [airtableIncludeCellValues, setAirtableIncludeCellValues] = useState(false)
// Original values to track changes
// State for storing initial values to detect changes
const [originalValues, setOriginalValues] = useState({
webhookProvider,
webhookPath,
slackSigningSecret: '',
whatsappVerificationToken: '',
githubContentType: 'application/json',
generalToken: '',
@@ -97,15 +100,21 @@ export function WebhookModal({
allowedIps: '',
discordWebhookName: '',
discordAvatarUrl: '',
slackSigningSecret: '',
airtableWebhookSecret: '',
airtableBaseId: '',
airtableTableId: '',
airtableIncludeCellValues: false,
telegramBotToken: '',
telegramTriggerPhrase: '',
selectedLabels: ['INBOX'] as string[],
labelFilterBehavior: 'INCLUDE',
markAsRead: false,
})
const [selectedLabels, setSelectedLabels] = useState<string[]>(['INBOX'])
const [labelFilterBehavior, setLabelFilterBehavior] = useState<'INCLUDE' | 'EXCLUDE'>('INCLUDE')
const [markAsRead, setMarkAsRead] = useState<boolean>(false)
// Get the current provider configuration
const provider = WEBHOOK_PROVIDERS[webhookProvider] || WEBHOOK_PROVIDERS.generic
@@ -229,6 +238,23 @@ export function WebhookModal({
telegramBotToken: botToken,
telegramTriggerPhrase: triggerPhrase,
}))
} else if (webhookProvider === 'gmail') {
const labelIds = config.labelIds || []
const labelFilterBehavior = config.labelFilterBehavior || 'INCLUDE'
setSelectedLabels(labelIds)
setLabelFilterBehavior(labelFilterBehavior)
setOriginalValues((prev) => ({
...prev,
selectedLabels: labelIds,
labelFilterBehavior,
}))
if (config.markAsRead !== undefined) {
setMarkAsRead(config.markAsRead)
setOriginalValues((prev) => ({ ...prev, markAsRead: config.markAsRead }))
}
}
}
}
@@ -269,7 +295,12 @@ export function WebhookModal({
airtableIncludeCellValues !== originalValues.airtableIncludeCellValues)) ||
(webhookProvider === 'telegram' &&
(telegramBotToken !== originalValues.telegramBotToken ||
telegramTriggerPhrase !== originalValues.telegramTriggerPhrase))
telegramTriggerPhrase !== originalValues.telegramTriggerPhrase)) ||
(webhookProvider === 'gmail' &&
(!selectedLabels.every((label) => originalValues.selectedLabels.includes(label)) ||
!originalValues.selectedLabels.every((label) => selectedLabels.includes(label)) ||
labelFilterBehavior !== originalValues.labelFilterBehavior ||
markAsRead !== originalValues.markAsRead))
setHasUnsavedChanges(hasChanges)
}, [
@@ -290,6 +321,9 @@ export function WebhookModal({
airtableIncludeCellValues,
telegramBotToken,
telegramTriggerPhrase,
selectedLabels,
labelFilterBehavior,
markAsRead,
])
// Validate required fields for current provider
@@ -314,6 +348,9 @@ export function WebhookModal({
case 'telegram':
isValid = telegramBotToken.trim() !== '' && telegramTriggerPhrase.trim() !== ''
break
case 'gmail':
isValid = selectedLabels.length > 0
break
}
setIsCurrentConfigValid(isValid)
}, [
@@ -324,6 +361,7 @@ export function WebhookModal({
whatsappVerificationToken,
telegramBotToken,
telegramTriggerPhrase,
selectedLabels,
])
// Use the provided path or generate a UUID-based path
@@ -356,6 +394,13 @@ export function WebhookModal({
}
case 'stripe':
return {}
case 'gmail':
return {
labelIds: selectedLabels,
labelFilterBehavior,
markAsRead,
maxEmailsPerPoll: 25,
}
case 'generic':
// Parse the allowed IPs into an array
const parsedIps = allowedIps
@@ -418,6 +463,8 @@ export function WebhookModal({
if (saveSuccessful) {
setOriginalValues({
webhookProvider,
webhookPath,
whatsappVerificationToken,
githubContentType,
generalToken,
@@ -433,6 +480,9 @@ export function WebhookModal({
airtableIncludeCellValues,
telegramBotToken,
telegramTriggerPhrase,
selectedLabels,
labelFilterBehavior,
markAsRead,
})
setHasUnsavedChanges(false)
setTestResult({
@@ -593,6 +643,17 @@ export function WebhookModal({
testWebhook={testWebhook}
/>
)
case 'gmail':
return (
<GmailConfig
selectedLabels={selectedLabels}
setSelectedLabels={setSelectedLabels}
labelFilterBehavior={labelFilterBehavior}
setLabelFilterBehavior={setLabelFilterBehavior}
markAsRead={markAsRead}
setMarkAsRead={setMarkAsRead}
/>
)
case 'discord':
return (
<DiscordConfig

View File

@@ -1,10 +1,11 @@
import { useEffect, useState } from 'react'
import { useParams } from 'next/navigation'
import { CheckCircle2, ExternalLink } from 'lucide-react'
import { ExternalLink } from 'lucide-react'
import {
AirtableIcon,
DiscordIcon,
GithubIcon,
GmailIcon,
SlackIcon,
StripeIcon,
TelegramIcon,
@@ -15,6 +16,7 @@ import { createLogger } from '@/lib/logs/console-logger'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import { useSubBlockValue } from '../../hooks/use-sub-block-value'
import { CredentialSelector } from '../credential-selector/credential-selector'
import { WebhookModal } from './components/webhook-modal'
const logger = createLogger('WebhookConfig')
@@ -64,6 +66,10 @@ export interface SlackConfig {
signingSecret: string
}
export interface GmailConfig {
credentialId: string
}
// Define Airtable-specific configuration type
export interface AirtableWebhookConfig {
baseId: string
@@ -87,6 +93,7 @@ export type ProviderConfig =
| SlackConfig
| AirtableWebhookConfig
| TelegramConfig
| GmailConfig
| Record<string, never>
// Define available webhook providers
@@ -118,6 +125,38 @@ export const WEBHOOK_PROVIDERS: { [key: string]: WebhookProvider } = {
},
},
},
gmail: {
id: 'gmail',
name: 'Gmail',
icon: (props) => <GmailIcon {...props} />,
configFields: {
labelFilterBehavior: {
type: 'select',
label: 'Label Filter Behavior',
options: ['INCLUDE', 'EXCLUDE'],
defaultValue: 'INCLUDE',
description: 'Whether to include or exclude the selected labels.',
},
markAsRead: {
type: 'boolean',
label: 'Mark As Read',
defaultValue: false,
description: 'Mark emails as read after processing.',
},
maxEmailsPerPoll: {
type: 'string',
label: 'Max Emails Per Poll',
defaultValue: '10',
description: 'Maximum number of emails to process in each check.',
},
pollingInterval: {
type: 'string',
label: 'Polling Interval (minutes)',
defaultValue: '5',
description: 'How often to check for new emails.',
},
},
},
discord: {
id: 'discord',
name: 'Discord',
@@ -256,6 +295,7 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
const params = useParams()
const workflowId = params.id as string
const [isLoading, setIsLoading] = useState(false)
const [gmailCredentialId, setGmailCredentialId] = useState<string>('')
// Get workflow store function to update webhook status
const setWebhookStatus = useWorkflowStore((state) => state.setWebhookStatus)
@@ -352,8 +392,16 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
setWebhookPath(path)
}
let finalConfig = config
if (webhookProvider === 'gmail' && gmailCredentialId) {
finalConfig = {
...config,
credentialId: gmailCredentialId,
}
}
// Set the provider config in the block state
setProviderConfig(config)
setProviderConfig(finalConfig)
// Save the webhook to the database
const response = await fetch('/api/webhooks', {
@@ -365,7 +413,7 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
workflowId,
path,
provider: webhookProvider || 'generic',
providerConfig: config,
providerConfig: finalConfig,
}),
})
@@ -472,6 +520,62 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
// Check if the webhook is connected for the selected provider
const isWebhookConnected = webhookId && webhookProvider === actualProvider
const handleCredentialChange = (credentialId: string) => {
setGmailCredentialId(credentialId)
}
// For Gmail, we need to show the credential selector
if (webhookProvider === 'gmail' && !isWebhookConnected) {
return (
<div className="w-full">
{error && <div className="text-sm text-red-500 dark:text-red-400 mb-2">{error}</div>}
<div className="mb-3">
<CredentialSelector
value={gmailCredentialId}
onChange={handleCredentialChange}
provider="google-email"
requiredScopes={[
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/gmail.labels',
]}
label="Select Gmail account"
disabled={isConnecting || isSaving || isDeleting}
/>
</div>
{gmailCredentialId && (
<Button
variant="outline"
size="sm"
className="w-full h-10 text-sm font-normal bg-background flex items-center"
onClick={handleOpenModal}
disabled={isConnecting || isSaving || isDeleting || !gmailCredentialId}
>
{isLoading ? (
<div className="h-4 w-4 mr-2 animate-spin rounded-full border-[1.5px] border-current border-t-transparent" />
) : (
<ExternalLink className="h-4 w-4 mr-2" />
)}
Configure Webhook
</Button>
)}
{isModalOpen && (
<WebhookModal
isOpen={isModalOpen}
onClose={handleCloseModal}
webhookPath={webhookPath || ''}
webhookProvider={webhookProvider || 'generic'}
onSave={handleSaveWebhook}
onDelete={handleDeleteWebhook}
webhookId={webhookId || undefined}
/>
)}
</div>
)
}
return (
<div className="w-full">
{error && <div className="text-sm text-red-500 dark:text-red-400 mb-2">{error}</div>}

View File

@@ -25,7 +25,7 @@ import { Switch } from './components/switch'
import { Table } from './components/table'
import { TimeInput } from './components/time-input'
import { ToolInput } from './components/tool-input/tool-input'
import { WebhookConfig } from './components/webhook/webhook-config'
import { WebhookConfig } from './components/webhook/webhook'
interface SubBlockProps {
blockId: string

View File

@@ -291,7 +291,6 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
const showScheduleIndicator = isStarterBlock && hasActiveSchedule
const showWebhookIndicator = isStarterBlock && hasActiveWebhook
// Helper function to get provider name - only create once
const getProviderName = (providerId: string): string => {
const providers: Record<string, string> = {
whatsapp: 'WhatsApp',
@@ -301,6 +300,7 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
generic: 'General',
slack: 'Slack',
airtable: 'Airtable',
gmail: 'Gmail',
}
return providers[providerId] || 'Webhook'
}

View File

@@ -34,6 +34,7 @@ export const GmailBlock: BlockConfig<GmailToolResponse> = {
serviceId: 'gmail',
requiredScopes: [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
// 'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.labels',
],

View File

@@ -47,6 +47,7 @@ export const StarterBlock: BlockConfig<StarterBlockOutput> = {
layout: 'full',
options: [
{ label: 'Slack', id: 'slack' },
{ label: 'Gmail', id: 'gmail' },
{ label: 'Airtable', id: 'airtable' },
{ label: 'Telegram', id: 'telegram' },
{ label: 'Generic', id: 'generic' },

View File

@@ -62,7 +62,7 @@ vi.mock('@/blocks/blocks/router')
vi.mock('@/blocks')
// Mock fetch for server requests
global.fetch = vi.fn()
global.fetch = Object.assign(vi.fn(), { preconnect: vi.fn() }) as typeof fetch
// Mock process.env
process.env.NEXT_PUBLIC_APP_URL = 'http://localhost:3000'

View File

@@ -31,14 +31,14 @@ vi.mock('@/tools', () => ({
executeTool: vi.fn(),
}))
global.fetch = vi.fn()
global.fetch = Object.assign(vi.fn(), { preconnect: vi.fn() }) as typeof fetch
const mockGetAllBlocks = getAllBlocks as Mock
const mockExecuteTool = executeTool as Mock
const mockIsHosted = isHosted as unknown as Mock
const mockGetProviderFromModel = getProviderFromModel as Mock
const mockTransformBlockTool = transformBlockTool as Mock
const mockFetch = global.fetch as Mock
const mockFetch = global.fetch as unknown as Mock
describe('AgentBlockHandler', () => {
let handler: AgentBlockHandler

View File

@@ -7,7 +7,7 @@ import { ExecutionContext } from '../../types'
import { EvaluatorBlockHandler } from './evaluator-handler'
const mockGetProviderFromModel = getProviderFromModel as Mock
const mockFetch = global.fetch as Mock
const mockFetch = global.fetch as unknown as Mock
describe('EvaluatorBlockHandler', () => {
let handler: EvaluatorBlockHandler

View File

@@ -11,7 +11,7 @@ import { RouterBlockHandler } from './router-handler'
const mockGenerateRouterPrompt = generateRouterPrompt as Mock
const mockGetProviderFromModel = getProviderFromModel as Mock
const MockPathTracker = PathTracker as MockedClass<typeof PathTracker>
const mockFetch = global.fetch as Mock
const mockFetch = global.fetch as unknown as Mock
describe('RouterBlockHandler', () => {
let handler: RouterBlockHandler

View File

@@ -298,6 +298,7 @@ export const auth = betterAuth({
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
// 'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.labels',
],

View File

@@ -79,6 +79,7 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
baseProviderIcon: (props) => GoogleIcon(props),
scopes: [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
// 'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.labels',
],

View File

@@ -0,0 +1,654 @@
import { and, eq } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { Logger } from '@/lib/logs/console-logger'
import { getBaseUrl } from '@/lib/urls/utils'
import { getOAuthToken } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
import { webhook } from '@/db/schema'
const logger = new Logger('GmailPollingService')
interface GmailWebhookConfig {
labelIds: string[]
labelFilterBehavior: 'INCLUDE' | 'EXCLUDE'
markAsRead: boolean
maxEmailsPerPoll?: number
lastCheckedTimestamp?: string
historyId?: string
processedEmailIds?: string[]
pollingInterval?: number
}
interface GmailEmail {
id: string
threadId: string
historyId?: string
labelIds?: string[]
payload?: any
snippet?: string
internalDate?: string
}
export async function pollGmailWebhooks() {
logger.info('Starting Gmail webhook polling')
try {
// Get all active Gmail webhooks
const activeWebhooks = await db
.select()
.from(webhook)
.where(and(eq(webhook.provider, 'gmail'), eq(webhook.isActive, true)))
if (!activeWebhooks.length) {
logger.info('No active Gmail webhooks found')
return { total: 0, successful: 0, failed: 0, details: [] }
}
logger.info(`Found ${activeWebhooks.length} active Gmail webhooks`)
const results = await Promise.allSettled(
activeWebhooks.map(async (webhookData) => {
const webhookId = webhookData.id
const requestId = nanoid()
try {
// Extract user ID from webhook metadata if available
const metadata = webhookData.providerConfig as any
const userId = metadata?.userId
if (!userId) {
logger.error(`[${requestId}] No user ID found for webhook ${webhookId}`)
return { success: false, webhookId, error: 'No user ID' }
}
// Get OAuth token for Gmail API
const accessToken = await getOAuthToken(userId, 'google-email')
if (!accessToken) {
logger.error(`[${requestId}] Failed to get Gmail access token for webhook ${webhookId}`)
return { success: false, webhookId, error: 'No access token' }
}
// Get webhook configuration
const config = webhookData.providerConfig as unknown as GmailWebhookConfig
const now = new Date()
// Fetch new emails
const fetchResult = await fetchNewEmails(accessToken, config, requestId)
const { emails, latestHistoryId } = fetchResult
if (!emails || !emails.length) {
// Update last checked timestamp
await updateWebhookLastChecked(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId
)
logger.info(`[${requestId}] No new emails found for webhook ${webhookId}`)
return { success: true, webhookId, status: 'no_emails' }
}
logger.info(`[${requestId}] Found ${emails.length} new emails for webhook ${webhookId}`)
// Get processed email IDs (to avoid duplicates)
const processedEmailIds = config.processedEmailIds || []
// Filter out emails that have already been processed
const newEmails = emails.filter((email) => !processedEmailIds.includes(email.id))
if (newEmails.length === 0) {
logger.info(
`[${requestId}] All emails have already been processed for webhook ${webhookId}`
)
await updateWebhookLastChecked(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId
)
return { success: true, webhookId, status: 'already_processed' }
}
logger.info(
`[${requestId}] Processing ${newEmails.length} new emails for webhook ${webhookId}`
)
// Process all emails (process each email as a separate workflow trigger)
const emailsToProcess = newEmails
// Process emails
const processed = await processEmails(
emailsToProcess,
webhookData,
config,
accessToken,
requestId
)
// Record which email IDs have been processed
const newProcessedIds = [
...processedEmailIds,
...emailsToProcess.map((email) => email.id),
]
// Keep only the most recent 100 IDs to prevent the list from growing too large
const trimmedProcessedIds = newProcessedIds.slice(-100)
// Update webhook with latest history ID, timestamp, and processed email IDs
await updateWebhookData(
webhookId,
now.toISOString(),
latestHistoryId || config.historyId,
trimmedProcessedIds
)
return {
success: true,
webhookId,
emailsFound: emails.length,
newEmails: newEmails.length,
emailsProcessed: processed,
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing Gmail webhook ${webhookId}:`, error)
return { success: false, webhookId, error: errorMessage }
}
})
)
const summary = {
total: results.length,
successful: results.filter((r) => r.status === 'fulfilled' && r.value.success).length,
failed: results.filter(
(r) => r.status === 'rejected' || (r.status === 'fulfilled' && !r.value.success)
).length,
details: results.map((r) =>
r.status === 'fulfilled' ? r.value : { success: false, error: r.reason }
),
}
logger.info('Gmail polling completed', {
total: summary.total,
successful: summary.successful,
failed: summary.failed,
})
return summary
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error('Error in Gmail polling service:', errorMessage)
throw error
}
}
async function fetchNewEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
try {
// Determine whether to use history API or search
const useHistoryApi = !!config.historyId
let emails = []
let latestHistoryId = config.historyId
if (useHistoryApi) {
// Use history API to get changes since last check
const historyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/history?startHistoryId=${config.historyId}`
const historyResponse = await fetch(historyUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!historyResponse.ok) {
const errorData = await historyResponse.json()
logger.error(`[${requestId}] Gmail history API error:`, {
status: historyResponse.status,
statusText: historyResponse.statusText,
error: errorData,
})
// Fall back to search if history API fails
logger.info(`[${requestId}] Falling back to search API after history API failure`)
const searchResult = await searchEmails(accessToken, config, requestId)
return {
emails: searchResult.emails,
latestHistoryId: searchResult.latestHistoryId,
}
}
const historyData = await historyResponse.json()
if (!historyData.history || !historyData.history.length) {
return { emails: [], latestHistoryId }
}
// Update the latest history ID
if (historyData.historyId) {
latestHistoryId = historyData.historyId
}
// Extract message IDs from history
const messageIds = new Set<string>()
for (const history of historyData.history) {
if (history.messagesAdded) {
for (const messageAdded of history.messagesAdded) {
messageIds.add(messageAdded.message.id)
}
}
}
if (messageIds.size === 0) {
return { emails: [], latestHistoryId }
}
// Sort IDs by recency (reverse order)
const sortedIds = [...messageIds].sort().reverse()
// Process all emails but respect the configured limit
const idsToFetch = sortedIds.slice(0, config.maxEmailsPerPoll || 25)
logger.info(`[${requestId}] Processing ${idsToFetch.length} emails from history API`)
// Fetch full email details for each message
const emailPromises = idsToFetch.map(async (messageId) => {
return getEmailDetails(accessToken, messageId)
})
const emailResults = await Promise.allSettled(emailPromises)
emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
// Filter emails by labels if needed
emails = filterEmailsByLabels(emails, config)
} else {
// Use search if no history ID is available
const searchResult = await searchEmails(accessToken, config, requestId)
return searchResult
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error fetching new emails:`, errorMessage)
return { emails: [], latestHistoryId: config.historyId }
}
}
async function searchEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
try {
// Build query parameters for label filtering
const labelQuery =
config.labelIds && config.labelIds.length > 0
? config.labelIds.map((label) => `label:${label}`).join(' ')
: 'in:inbox'
// Improved time-based filtering with dynamic buffer
let timeConstraint = ''
if (config.lastCheckedTimestamp) {
// Parse the last check time
const lastCheckedTime = new Date(config.lastCheckedTimestamp)
const now = new Date()
// Calculate minutes since last check
const minutesSinceLastCheck = (now.getTime() - lastCheckedTime.getTime()) / (60 * 1000)
// If last check was recent, use precise time-based query
if (minutesSinceLastCheck < 60) {
// Less than an hour ago
// Calculate buffer in seconds - the greater of:
// 1. Twice the configured polling interval (or 2 minutes if not set)
// 2. At least 5 minutes (300 seconds)
const bufferSeconds = Math.max((config.pollingInterval || 2) * 60 * 2, 300)
// Calculate the cutoff time with buffer
const cutoffTime = new Date(lastCheckedTime.getTime() - bufferSeconds * 1000)
// Format for Gmail's search syntax (seconds since epoch)
const timestamp = Math.floor(cutoffTime.getTime() / 1000)
timeConstraint = ` after:${timestamp}`
logger.debug(`[${requestId}] Using timestamp-based query with ${bufferSeconds}s buffer`)
}
// If last check was a while ago, use Gmail's relative time queries
else if (minutesSinceLastCheck < 24 * 60) {
// Less than a day
// Use newer_than:Xh syntax for better reliability with longer intervals
const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer
timeConstraint = ` newer_than:${hours}h`
logger.debug(`[${requestId}] Using hour-based query: newer_than:${hours}h`)
} else {
// For very old last checks, limit to a reasonable time period (7 days max)
const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1
timeConstraint = ` newer_than:${days}d`
logger.debug(`[${requestId}] Using day-based query: newer_than:${days}d`)
}
} else {
// If there's no last checked timestamp, default to recent emails (last 24h)
timeConstraint = ' newer_than:1d'
logger.debug(`[${requestId}] No last check time, using default: newer_than:1d`)
}
// Combine label and time constraints
const query =
config.labelFilterBehavior === 'INCLUDE'
? `${labelQuery}${timeConstraint}`
: `-${labelQuery}${timeConstraint}`
logger.info(`[${requestId}] Searching for emails with query: ${query}`)
// Search for emails with lower default
const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}`
const searchResponse = await fetch(searchUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!searchResponse.ok) {
const errorData = await searchResponse.json()
logger.error(`[${requestId}] Gmail search API error:`, {
status: searchResponse.status,
statusText: searchResponse.statusText,
query: query,
error: errorData,
})
return { emails: [], latestHistoryId: config.historyId }
}
const searchData = await searchResponse.json()
if (!searchData.messages || !searchData.messages.length) {
logger.info(`[${requestId}] No emails found matching query: ${query}`)
return { emails: [], latestHistoryId: config.historyId }
}
// Process emails within the limit
let idsToFetch = searchData.messages.slice(0, config.maxEmailsPerPoll || 25)
let latestHistoryId = config.historyId
logger.info(
`[${requestId}] Processing ${idsToFetch.length} emails from search API (total matches: ${searchData.messages.length})`
)
// Fetch full email details for each message
const emailPromises = idsToFetch.map(async (message: { id: string }) => {
return getEmailDetails(accessToken, message.id)
})
const emailResults = await Promise.allSettled(emailPromises)
const emails = emailResults
.filter(
(result): result is PromiseFulfilledResult<GmailEmail> => result.status === 'fulfilled'
)
.map((result) => result.value)
// Get the latest history ID from the first email (most recent)
if (emails.length > 0 && emails[0].historyId) {
latestHistoryId = emails[0].historyId
logger.debug(`[${requestId}] Updated historyId to ${latestHistoryId}`)
}
return { emails, latestHistoryId }
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error searching emails:`, errorMessage)
return { emails: [], latestHistoryId: config.historyId }
}
}
async function getEmailDetails(accessToken: string, messageId: string): Promise<GmailEmail> {
const messageUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}?format=full`
const messageResponse = await fetch(messageUrl, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
})
if (!messageResponse.ok) {
const errorData = await messageResponse.json().catch(() => ({}))
throw new Error(
`Failed to fetch email details for message ${messageId}: ${messageResponse.status} ${messageResponse.statusText} - ${JSON.stringify(errorData)}`
)
}
return await messageResponse.json()
}
function filterEmailsByLabels(emails: GmailEmail[], config: GmailWebhookConfig): GmailEmail[] {
if (!config.labelIds.length) {
return emails
}
return emails.filter((email) => {
const emailLabels = email.labelIds || []
const hasMatchingLabel = config.labelIds.some((configLabel) =>
emailLabels.includes(configLabel)
)
return config.labelFilterBehavior === 'INCLUDE'
? hasMatchingLabel // Include emails with matching labels
: !hasMatchingLabel // Exclude emails with matching labels
})
}
async function processEmails(
emails: any[],
webhookData: any,
config: GmailWebhookConfig,
accessToken: string,
requestId: string
) {
let processedCount = 0
for (const email of emails) {
try {
// Extract useful information from email to create a simplified payload
// First, extract headers into a map for easy access
const headers: Record<string, string> = {}
if (email.payload?.headers) {
for (const header of email.payload.headers) {
headers[header.name.toLowerCase()] = header.value
}
}
// Extract and decode email body content
let textContent = ''
let htmlContent = ''
// Function to extract content from parts recursively
const extractContent = (part: any) => {
if (!part) return
// Extract current part content if it exists
if (part.mimeType === 'text/plain' && part.body?.data) {
textContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
} else if (part.mimeType === 'text/html' && part.body?.data) {
htmlContent = Buffer.from(part.body.data, 'base64').toString('utf-8')
}
// Process nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
extractContent(subPart)
}
}
}
// Extract content from the email payload
if (email.payload) {
extractContent(email.payload)
}
// Parse date into standard format
let date: string | null = null
if (headers.date) {
try {
date = new Date(headers.date).toISOString()
} catch (e) {
// Keep date as null if parsing fails
}
} else if (email.internalDate) {
// Use internalDate as fallback (convert from timestamp to ISO string)
date = new Date(parseInt(email.internalDate)).toISOString()
}
// Extract attachment information if present
const attachments: Array<{ filename: string; mimeType: string; size: number }> = []
const findAttachments = (part: any) => {
if (!part) return
if (part.filename && part.filename.length > 0) {
attachments.push({
filename: part.filename,
mimeType: part.mimeType || 'application/octet-stream',
size: part.body?.size || 0,
})
}
// Look for attachments in nested parts
if (part.parts && Array.isArray(part.parts)) {
for (const subPart of part.parts) {
findAttachments(subPart)
}
}
}
if (email.payload) {
findAttachments(email.payload)
}
// Create simplified email object
const simplifiedEmail = {
id: email.id,
threadId: email.threadId,
// Basic info
subject: headers.subject || '[No Subject]',
from: headers.from || '',
to: headers.to || '',
cc: headers.cc || '',
date: date,
// Content
bodyText: textContent,
bodyHtml: htmlContent,
snippet: email.snippet || '',
// Metadata
labels: email.labelIds || [],
hasAttachments: attachments.length > 0,
attachments: attachments,
}
// Prepare webhook payload with simplified email
const payload = {
email: simplifiedEmail,
timestamp: new Date().toISOString(),
}
logger.debug(`[${requestId}] Sending simplified email payload for ${email.id}`)
// Trigger the webhook
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Secret': webhookData.secret || '',
},
body: JSON.stringify(payload),
})
if (!response.ok) {
logger.error(
`[${requestId}] Failed to trigger webhook for email ${email.id}:`,
response.status,
await response.text()
)
continue
}
// Mark email as read if configured
if (config.markAsRead) {
await markEmailAsRead(accessToken, email.id)
}
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing email ${email.id}:`, errorMessage)
}
}
return processedCount
}
async function markEmailAsRead(accessToken: string, messageId: string) {
const modifyUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages/${messageId}/modify`
try {
const response = await fetch(modifyUrl, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
removeLabelIds: ['UNREAD'],
}),
})
if (!response.ok) {
throw new Error(
`Failed to mark email ${messageId} as read: ${response.status} ${response.statusText}`
)
}
} catch (error) {
logger.error(`Error marking email ${messageId} as read:`, error)
throw error
}
}
async function updateWebhookLastChecked(webhookId: string, timestamp: string, historyId?: string) {
const existingConfig =
(await db.select().from(webhook).where(eq(webhook.id, webhookId)))[0]?.providerConfig || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
},
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
}
async function updateWebhookData(
webhookId: string,
timestamp: string,
historyId?: string,
processedEmailIds?: string[]
) {
const existingConfig =
(await db.select().from(webhook).where(eq(webhook.id, webhookId)))[0]?.providerConfig || {}
await db
.update(webhook)
.set({
providerConfig: {
...existingConfig,
lastCheckedTimestamp: timestamp,
...(historyId ? { historyId } : {}),
...(processedEmailIds ? { processedEmailIds } : {}),
},
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
}

View File

@@ -652,6 +652,23 @@ export function verifyProviderWebhook(
break // No specific auth here
case 'stripe':
break // Stripe verification would go here
case 'gmail':
if (providerConfig.secret) {
const secretHeader = request.headers.get('X-Webhook-Secret')
if (!secretHeader || secretHeader.length !== providerConfig.secret.length) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
let result = 0
for (let i = 0; i < secretHeader.length; i++) {
result |= secretHeader.charCodeAt(i) ^ providerConfig.secret.charCodeAt(i)
}
if (result !== 0) {
logger.warn(`[${requestId}] Invalid Gmail webhook secret`)
return new NextResponse('Unauthorized', { status: 401 })
}
}
break
case 'generic':
// Generic auth logic: requireAuth, token, secretHeaderName, allowedIps
if (providerConfig.requireAuth) {
@@ -1273,3 +1290,67 @@ export interface AirtableChange {
changedFields: Record<string, any> // { fieldId: newValue }
previousFields?: Record<string, any> // { fieldId: previousValue } (optional)
}
/**
* Configure Gmail polling for a webhook
*/
export async function configureGmailPolling(
userId: string,
webhookData: any,
requestId: string
): Promise<boolean> {
const logger = createLogger('GmailWebhookSetup')
logger.info(`[${requestId}] Setting up Gmail polling for webhook ${webhookData.id}`)
try {
const accessToken = await getOAuthToken(userId, 'google-email')
if (!accessToken) {
logger.error(`[${requestId}] Failed to retrieve Gmail access token for user ${userId}`)
return false
}
const providerConfig = (webhookData.providerConfig as Record<string, any>) || {}
const maxEmailsPerPoll =
typeof providerConfig.maxEmailsPerPoll === 'string'
? parseInt(providerConfig.maxEmailsPerPoll, 10) || 25
: providerConfig.maxEmailsPerPoll || 25
const pollingInterval =
typeof providerConfig.pollingInterval === 'string'
? parseInt(providerConfig.pollingInterval, 10) || 5
: providerConfig.pollingInterval || 5
const now = new Date()
await db
.update(webhook)
.set({
providerConfig: {
...providerConfig,
userId, // Store user ID for OAuth access during polling
maxEmailsPerPoll,
pollingInterval,
markAsRead: providerConfig.markAsRead || false,
labelIds: providerConfig.labelIds || ['INBOX'],
labelFilterBehavior: providerConfig.labelFilterBehavior || 'INCLUDE',
lastCheckedTimestamp: now.toISOString(),
setupCompleted: true,
},
updatedAt: now,
})
.where(eq(webhook.id, webhookData.id))
logger.info(
`[${requestId}] Successfully configured Gmail polling for webhook ${webhookData.id}`
)
return true
} catch (error: any) {
logger.error(`[${requestId}] Failed to configure Gmail polling`, {
webhookId: webhookData.id,
error: error.message,
stack: error.stack,
})
return false
}
}

View File

@@ -7,6 +7,11 @@
import { Mock, vi } from 'vitest'
import { ToolConfig, ToolResponse } from '../types'
// Define a type that combines Mock with fetch properties
type MockFetch = Mock & {
preconnect: Mock
}
/**
* Create standard mock headers for HTTP testing
*/
@@ -35,7 +40,7 @@ export function createMockFetch(
) {
const { ok = true, status = 200, headers = { 'Content-Type': 'application/json' } } = options
return vi.fn().mockResolvedValue({
const mockFn = vi.fn().mockResolvedValue({
ok,
status,
headers: {
@@ -51,6 +56,11 @@ export function createMockFetch(
typeof responseData === 'string' ? responseData : JSON.stringify(responseData)
),
})
// Add preconnect property to satisfy TypeScript
;(mockFn as any).preconnect = vi.fn()
return mockFn as MockFetch
}
/**
@@ -65,10 +75,12 @@ export function createErrorFetch(errorMessage: string, status = 400) {
// This better mimics different kinds of errors that can happen
if (status < 0) {
// Network error that causes the fetch to reject
return vi.fn().mockRejectedValue(error)
const mockFn = vi.fn().mockRejectedValue(error)
;(mockFn as any).preconnect = vi.fn()
return mockFn as MockFetch
} else {
// HTTP error with status code
return vi.fn().mockResolvedValue({
const mockFn = vi.fn().mockResolvedValue({
ok: false,
status,
statusText: errorMessage,
@@ -87,6 +99,8 @@ export function createErrorFetch(errorMessage: string, status = 400) {
})
),
})
;(mockFn as any).preconnect = vi.fn()
return mockFn as MockFetch
}
}
@@ -95,7 +109,7 @@ export function createErrorFetch(errorMessage: string, status = 400) {
*/
export class ToolTester<P = any, R = any> {
tool: ToolConfig<P, R>
private mockFetch: Mock
private mockFetch: MockFetch
private originalFetch: typeof fetch
private mockResponse: any
private mockResponseOptions: { ok: boolean; status: number; headers: Record<string, string> }
@@ -126,7 +140,7 @@ export class ToolTester<P = any, R = any> {
headers: options.headers ?? { 'content-type': 'application/json' },
}
this.mockFetch = createMockFetch(this.mockResponse, this.mockResponseOptions)
global.fetch = this.mockFetch
global.fetch = this.mockFetch as unknown as typeof fetch
return this
}
@@ -135,7 +149,7 @@ export class ToolTester<P = any, R = any> {
*/
setupError(errorMessage: string, status = 400) {
this.mockFetch = createErrorFetch(errorMessage, status)
global.fetch = this.mockFetch
global.fetch = this.mockFetch as unknown as typeof fetch
// Create an error object that the transformError function can use
this.error = new Error(errorMessage)
@@ -467,7 +481,8 @@ export function mockEnvironmentVariables(variables: Record<string, string>) {
export function mockOAuthTokenRequest(accessToken = 'mock-access-token') {
// Mock the fetch call to /api/auth/oauth/token
const originalFetch = global.fetch
const mockTokenFetch = vi.fn().mockImplementation((url, options) => {
const mockFn = vi.fn().mockImplementation((url, options) => {
if (url.toString().includes('/api/auth/oauth/token')) {
return Promise.resolve({
ok: true,
@@ -478,7 +493,12 @@ export function mockOAuthTokenRequest(accessToken = 'mock-access-token') {
return originalFetch(url, options)
})
global.fetch = mockTokenFetch
// Add preconnect property
;(mockFn as any).preconnect = vi.fn()
const mockTokenFetch = mockFn as MockFetch
global.fetch = mockTokenFetch as unknown as typeof fetch
// Return a cleanup function
return () => {

View File

@@ -112,7 +112,8 @@ describe('Gmail Read Tool', () => {
// Then setup response for the first message
const originalFetch = global.fetch
global.fetch = vi.fn().mockImplementation((url, options) => {
global.fetch = Object.assign(
vi.fn().mockImplementation((url, options) => {
// Check if it's a token request
if (url.toString().includes('/api/auth/oauth/token')) {
return Promise.resolve({
@@ -149,7 +150,9 @@ describe('Gmail Read Tool', () => {
}
return originalFetch(url, options)
})
}),
{ preconnect: vi.fn() }
) as typeof fetch
// Execute with credential instead of access token
await tester.execute({
@@ -196,7 +199,8 @@ describe('Gmail Read Tool', () => {
const originalFetch = global.fetch
// First setup response for message list
global.fetch = vi
global.fetch = Object.assign(
vi
.fn()
.mockImplementationOnce((url, options) => {
return Promise.resolve({
@@ -220,7 +224,9 @@ describe('Gmail Read Tool', () => {
forEach: () => {},
},
})
})
}),
{ preconnect: vi.fn() }
) as typeof fetch
// Execute the tool
const result = await tester.execute({

View File

@@ -135,7 +135,8 @@ describe('executeTool Function', () => {
beforeEach(() => {
// Mock fetch
global.fetch = vi.fn().mockImplementation(async (url, options) => {
global.fetch = Object.assign(
vi.fn().mockImplementation(async (url, options) => {
if (url.toString().includes('/api/proxy')) {
return {
ok: true,
@@ -161,7 +162,9 @@ describe('executeTool Function', () => {
forEach: () => {},
},
}
})
}),
{ preconnect: vi.fn() }
) as typeof fetch
// Set environment variables
process.env.NEXT_PUBLIC_APP_URL = 'http://localhost:3000'
@@ -244,7 +247,8 @@ describe('executeTool Function', () => {
test('should handle errors from tools', async () => {
// Mock a failed response
global.fetch = vi.fn().mockImplementation(async () => {
global.fetch = Object.assign(
vi.fn().mockImplementation(async () => {
return {
ok: false,
status: 400,
@@ -253,7 +257,9 @@ describe('executeTool Function', () => {
error: 'Bad request',
}),
}
})
}),
{ preconnect: vi.fn() }
) as typeof fetch
const result = await executeTool(
'http_request',

View File

@@ -3,6 +3,10 @@
{
"path": "/api/schedules/execute",
"schedule": "*/1 * * * *"
},
{
"path": "/api/webhooks/poll/gmail",
"schedule": "*/1 * * * *"
}
]
}