feat(trigger): add Google Sheets, Drive, and Calendar polling triggers

Add polling triggers for Google Sheets (new rows), Google Drive (file
changes via changes.list API), and Google Calendar (event updates via
updatedMin). Each includes OAuth credential support, configurable
filters (event type, MIME type, folder, search term, render options),
idempotency, and first-poll seeding. Wire triggers into block configs
and regenerate integrations.json. Update add-trigger skill with polling
instructions and versioned block wiring guidance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Waleed Latif
2026-04-09 14:03:09 -07:00
committed by waleed
parent 4074109362
commit 06b68f7463
19 changed files with 2114 additions and 26 deletions

View File

@@ -1,17 +1,17 @@
---
description: Create webhook triggers for a Sim integration using the generic trigger builder
description: Create webhook or polling triggers for a Sim integration
argument-hint: <service-name>
---
# Add Trigger
You are an expert at creating webhook triggers for Sim. You understand the trigger system, the generic `buildTriggerSubBlocks` helper, and how triggers connect to blocks.
You are an expert at creating webhook and polling triggers for Sim. You understand the trigger system, the generic `buildTriggerSubBlocks` helper, polling infrastructure, and how triggers connect to blocks.
## Your Task
1. Research what webhook events the service supports
2. Create the trigger files using the generic builder
3. Create a provider handler if custom auth, formatting, or subscriptions are needed
1. Research what webhook events the service supports — if the service lacks reliable webhooks, use polling
2. Create the trigger files using the generic builder (webhook) or manual config (polling)
3. Create a provider handler (webhook) or polling handler (polling)
4. Register triggers and connect them to the block
## Directory Structure
@@ -146,23 +146,37 @@ export const TRIGGER_REGISTRY: TriggerRegistry = {
### Block file (`apps/sim/blocks/blocks/{service}.ts`)
Wire triggers into the block so the trigger UI appears and `generate-docs.ts` discovers them. Two changes are needed:
1. **Spread trigger subBlocks** at the end of the block's `subBlocks` array
2. **Add `triggers` property** after `outputs` with `enabled: true` and `available: [...]`
```typescript
import { getTrigger } from '@/triggers'
export const {Service}Block: BlockConfig = {
// ...
triggers: {
enabled: true,
available: ['{service}_event_a', '{service}_event_b'],
},
subBlocks: [
// Regular tool subBlocks first...
...getTrigger('{service}_event_a').subBlocks,
...getTrigger('{service}_event_b').subBlocks,
],
// ... tools, inputs, outputs ...
triggers: {
enabled: true,
available: ['{service}_event_a', '{service}_event_b'],
},
}
```
**Versioned blocks (V1 + V2):** Many integrations have a hidden V1 block and a visible V2 block. Where you add the trigger wiring depends on how V2 inherits from V1:
- **V2 uses `...V1Block` spread** (e.g., Google Calendar): Add trigger to V1 — V2 inherits both `subBlocks` and `triggers` automatically.
- **V2 defines its own `subBlocks`** (e.g., Google Sheets): Add trigger to V2 (the visible block). V1 is hidden and doesn't need it.
- **Single block, no V2** (e.g., Google Drive): Add trigger directly.
`generate-docs.ts` deduplicates by base type (first match wins). If V1 is processed first without triggers, the V2 triggers won't appear in `integrations.json`. Always verify by checking the output after running the script.
## Provider Handler
All provider-specific webhook logic lives in a single handler file: `apps/sim/lib/webhooks/providers/{service}.ts`.
@@ -327,6 +341,122 @@ export function buildOutputs(): Record<string, TriggerOutput> {
}
```
## Polling Triggers
Use polling when the service lacks reliable webhooks (e.g., Google Sheets, Google Drive, Google Calendar, Gmail, RSS, IMAP). Polling triggers do NOT use `buildTriggerSubBlocks` — they define subBlocks manually.
### Directory Structure
```
apps/sim/triggers/{service}/
├── index.ts # Barrel export
└── poller.ts # TriggerConfig with polling: true
apps/sim/lib/webhooks/polling/
└── {service}.ts # PollingProviderHandler implementation
```
### Polling Handler (`apps/sim/lib/webhooks/polling/{service}.ts`)
```typescript
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import { markWebhookFailed, markWebhookSuccess, resolveOAuthCredential, updateWebhookProviderConfig } from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
export const {service}PollingHandler: PollingProviderHandler = {
provider: '{service}',
label: '{Service}',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
// For OAuth services:
const accessToken = await resolveOAuthCredential(webhookData, '{service}', requestId, logger)
const config = webhookData.providerConfig as unknown as {Service}WebhookConfig
// First poll: seed state, emit nothing
if (!config.lastCheckedTimestamp) {
await updateWebhookProviderConfig(webhookId, { lastCheckedTimestamp: new Date().toISOString() }, logger)
await markWebhookSuccess(webhookId, logger)
return 'success'
}
// Fetch changes since last poll, process with idempotency
// ...
await markWebhookSuccess(webhookId, logger)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing {service} webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
```
**Key patterns:**
- First poll seeds state and emits nothing (avoids flooding with existing data)
- Use `pollingIdempotency.executeWithIdempotency(provider, key, callback)` for dedup
- Use `processPolledWebhookEvent(webhookData, workflowData, payload, requestId)` to fire the workflow
- Use `updateWebhookProviderConfig(webhookId, partialConfig, logger)` for read-merge-write on state
- Use the latest server-side timestamp from API responses (not wall clock) to avoid clock skew
### Trigger Config (`apps/sim/triggers/{service}/poller.ts`)
```typescript
import { {Service}Icon } from '@/components/icons'
import type { TriggerConfig } from '@/triggers/types'
export const {service}PollingTrigger: TriggerConfig = {
id: '{service}_poller',
name: '{Service} Trigger',
provider: '{service}',
description: 'Triggers when ...',
version: '1.0.0',
icon: {Service}Icon,
polling: true, // REQUIRED — routes to polling infrastructure
subBlocks: [
{ id: 'triggerCredentials', type: 'oauth-input', title: 'Credentials', serviceId: '{service}', requiredScopes: [], required: true, mode: 'trigger', supportsCredentialSets: true },
// ... service-specific config fields (dropdowns, inputs, switches) ...
{ id: 'triggerSave', type: 'trigger-save', title: '', hideFromPreview: true, mode: 'trigger', triggerId: '{service}_poller' },
{ id: 'triggerInstructions', type: 'text', title: 'Setup Instructions', hideFromPreview: true, mode: 'trigger', defaultValue: '...' },
],
outputs: {
// Must match the payload shape from processPolledWebhookEvent
},
}
```
### Registration (3 places)
1. **`apps/sim/triggers/constants.ts`** — add provider to `POLLING_PROVIDERS` Set
2. **`apps/sim/lib/webhooks/polling/registry.ts`** — import handler, add to `POLLING_HANDLERS`
3. **`apps/sim/triggers/registry.ts`** — import trigger config, add to `TRIGGER_REGISTRY`
### Helm Cron Job
Add to `helm/sim/values.yaml` under the existing polling cron jobs:
```yaml
{service}WebhookPoll:
schedule: "*/1 * * * *"
concurrencyPolicy: Forbid
url: "http://sim:3000/api/webhooks/poll/{service}"
```
### Reference Implementations
- Simple: `apps/sim/lib/webhooks/polling/rss.ts` + `apps/sim/triggers/rss/poller.ts`
- Complex (OAuth, attachments): `apps/sim/lib/webhooks/polling/gmail.ts` + `apps/sim/triggers/gmail/poller.ts`
- Cursor-based (changes API): `apps/sim/lib/webhooks/polling/google-drive.ts`
- Timestamp-based: `apps/sim/lib/webhooks/polling/google-calendar.ts`
## Checklist
### Trigger Definition
@@ -352,7 +482,18 @@ export function buildOutputs(): Record<string, TriggerOutput> {
- [ ] NO changes to `route.ts`, `provider-subscriptions.ts`, or `deploy.ts`
- [ ] API key field uses `password: true`
### Polling Trigger (if applicable)
- [ ] Handler implements `PollingProviderHandler` at `lib/webhooks/polling/{service}.ts`
- [ ] Trigger config has `polling: true` and defines subBlocks manually (no `buildTriggerSubBlocks`)
- [ ] Provider string matches across: trigger config, handler, `POLLING_PROVIDERS`, polling registry
- [ ] `triggerSave` subBlock `triggerId` matches trigger config `id`
- [ ] First poll seeds state and emits nothing
- [ ] Added provider to `POLLING_PROVIDERS` in `triggers/constants.ts`
- [ ] Added handler to `POLLING_HANDLERS` in `lib/webhooks/polling/registry.ts`
- [ ] Added cron job to `helm/sim/values.yaml`
- [ ] Payload shape matches trigger `outputs` schema
### Testing
- [ ] `bun run type-check` passes
- [ ] Manually verify `formatInput` output keys match trigger `outputs` keys
- [ ] Manually verify output keys match trigger `outputs` keys
- [ ] Trigger UI shows correctly in the block

View File

@@ -1,12 +1,12 @@
# Add Trigger
You are an expert at creating webhook triggers for Sim. You understand the trigger system, the generic `buildTriggerSubBlocks` helper, and how triggers connect to blocks.
You are an expert at creating webhook and polling triggers for Sim. You understand the trigger system, the generic `buildTriggerSubBlocks` helper, polling infrastructure, and how triggers connect to blocks.
## Your Task
1. Research what webhook events the service supports
2. Create the trigger files using the generic builder
3. Create a provider handler if custom auth, formatting, or subscriptions are needed
1. Research what webhook events the service supports — if the service lacks reliable webhooks, use polling
2. Create the trigger files using the generic builder (webhook) or manual config (polling)
3. Create a provider handler (webhook) or polling handler (polling)
4. Register triggers and connect them to the block
## Directory Structure
@@ -141,23 +141,37 @@ export const TRIGGER_REGISTRY: TriggerRegistry = {
### Block file (`apps/sim/blocks/blocks/{service}.ts`)
Wire triggers into the block so the trigger UI appears and `generate-docs.ts` discovers them. Two changes are needed:
1. **Spread trigger subBlocks** at the end of the block's `subBlocks` array
2. **Add `triggers` property** after `outputs` with `enabled: true` and `available: [...]`
```typescript
import { getTrigger } from '@/triggers'
export const {Service}Block: BlockConfig = {
// ...
triggers: {
enabled: true,
available: ['{service}_event_a', '{service}_event_b'],
},
subBlocks: [
// Regular tool subBlocks first...
...getTrigger('{service}_event_a').subBlocks,
...getTrigger('{service}_event_b').subBlocks,
],
// ... tools, inputs, outputs ...
triggers: {
enabled: true,
available: ['{service}_event_a', '{service}_event_b'],
},
}
```
**Versioned blocks (V1 + V2):** Many integrations have a hidden V1 block and a visible V2 block. Where you add the trigger wiring depends on how V2 inherits from V1:
- **V2 uses `...V1Block` spread** (e.g., Google Calendar): Add trigger to V1 — V2 inherits both `subBlocks` and `triggers` automatically.
- **V2 defines its own `subBlocks`** (e.g., Google Sheets): Add trigger to V2 (the visible block). V1 is hidden and doesn't need it.
- **Single block, no V2** (e.g., Google Drive): Add trigger directly.
`generate-docs.ts` deduplicates by base type (first match wins). If V1 is processed first without triggers, the V2 triggers won't appear in `integrations.json`. Always verify by checking the output after running the script.
## Provider Handler
All provider-specific webhook logic lives in a single handler file: `apps/sim/lib/webhooks/providers/{service}.ts`.
@@ -322,6 +336,122 @@ export function buildOutputs(): Record<string, TriggerOutput> {
}
```
## Polling Triggers
Use polling when the service lacks reliable webhooks (e.g., Google Sheets, Google Drive, Google Calendar, Gmail, RSS, IMAP). Polling triggers do NOT use `buildTriggerSubBlocks` — they define subBlocks manually.
### Directory Structure
```
apps/sim/triggers/{service}/
├── index.ts # Barrel export
└── poller.ts # TriggerConfig with polling: true
apps/sim/lib/webhooks/polling/
└── {service}.ts # PollingProviderHandler implementation
```
### Polling Handler (`apps/sim/lib/webhooks/polling/{service}.ts`)
```typescript
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import { markWebhookFailed, markWebhookSuccess, resolveOAuthCredential, updateWebhookProviderConfig } from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
export const {service}PollingHandler: PollingProviderHandler = {
provider: '{service}',
label: '{Service}',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
// For OAuth services:
const accessToken = await resolveOAuthCredential(webhookData, '{service}', requestId, logger)
const config = webhookData.providerConfig as unknown as {Service}WebhookConfig
// First poll: seed state, emit nothing
if (!config.lastCheckedTimestamp) {
await updateWebhookProviderConfig(webhookId, { lastCheckedTimestamp: new Date().toISOString() }, logger)
await markWebhookSuccess(webhookId, logger)
return 'success'
}
// Fetch changes since last poll, process with idempotency
// ...
await markWebhookSuccess(webhookId, logger)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing {service} webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
```
**Key patterns:**
- First poll seeds state and emits nothing (avoids flooding with existing data)
- Use `pollingIdempotency.executeWithIdempotency(provider, key, callback)` for dedup
- Use `processPolledWebhookEvent(webhookData, workflowData, payload, requestId)` to fire the workflow
- Use `updateWebhookProviderConfig(webhookId, partialConfig, logger)` for read-merge-write on state
- Use the latest server-side timestamp from API responses (not wall clock) to avoid clock skew
### Trigger Config (`apps/sim/triggers/{service}/poller.ts`)
```typescript
import { {Service}Icon } from '@/components/icons'
import type { TriggerConfig } from '@/triggers/types'
export const {service}PollingTrigger: TriggerConfig = {
id: '{service}_poller',
name: '{Service} Trigger',
provider: '{service}',
description: 'Triggers when ...',
version: '1.0.0',
icon: {Service}Icon,
polling: true, // REQUIRED — routes to polling infrastructure
subBlocks: [
{ id: 'triggerCredentials', type: 'oauth-input', title: 'Credentials', serviceId: '{service}', requiredScopes: [], required: true, mode: 'trigger', supportsCredentialSets: true },
// ... service-specific config fields (dropdowns, inputs, switches) ...
{ id: 'triggerSave', type: 'trigger-save', title: '', hideFromPreview: true, mode: 'trigger', triggerId: '{service}_poller' },
{ id: 'triggerInstructions', type: 'text', title: 'Setup Instructions', hideFromPreview: true, mode: 'trigger', defaultValue: '...' },
],
outputs: {
// Must match the payload shape from processPolledWebhookEvent
},
}
```
### Registration (3 places)
1. **`apps/sim/triggers/constants.ts`** — add provider to `POLLING_PROVIDERS` Set
2. **`apps/sim/lib/webhooks/polling/registry.ts`** — import handler, add to `POLLING_HANDLERS`
3. **`apps/sim/triggers/registry.ts`** — import trigger config, add to `TRIGGER_REGISTRY`
### Helm Cron Job
Add to `helm/sim/values.yaml` under the existing polling cron jobs:
```yaml
{service}WebhookPoll:
schedule: "*/1 * * * *"
concurrencyPolicy: Forbid
url: "http://sim:3000/api/webhooks/poll/{service}"
```
### Reference Implementations
- Simple: `apps/sim/lib/webhooks/polling/rss.ts` + `apps/sim/triggers/rss/poller.ts`
- Complex (OAuth, attachments): `apps/sim/lib/webhooks/polling/gmail.ts` + `apps/sim/triggers/gmail/poller.ts`
- Cursor-based (changes API): `apps/sim/lib/webhooks/polling/google-drive.ts`
- Timestamp-based: `apps/sim/lib/webhooks/polling/google-calendar.ts`
## Checklist
### Trigger Definition
@@ -347,7 +477,18 @@ export function buildOutputs(): Record<string, TriggerOutput> {
- [ ] NO changes to `route.ts`, `provider-subscriptions.ts`, or `deploy.ts`
- [ ] API key field uses `password: true`
### Polling Trigger (if applicable)
- [ ] Handler implements `PollingProviderHandler` at `lib/webhooks/polling/{service}.ts`
- [ ] Trigger config has `polling: true` and defines subBlocks manually (no `buildTriggerSubBlocks`)
- [ ] Provider string matches across: trigger config, handler, `POLLING_PROVIDERS`, polling registry
- [ ] `triggerSave` subBlock `triggerId` matches trigger config `id`
- [ ] First poll seeds state and emits nothing
- [ ] Added provider to `POLLING_PROVIDERS` in `triggers/constants.ts`
- [ ] Added handler to `POLLING_HANDLERS` in `lib/webhooks/polling/registry.ts`
- [ ] Added cron job to `helm/sim/values.yaml`
- [ ] Payload shape matches trigger `outputs` schema
### Testing
- [ ] `bun run type-check` passes
- [ ] Manually verify `formatInput` output keys match trigger `outputs` keys
- [ ] Manually verify output keys match trigger `outputs` keys
- [ ] Trigger UI shows correctly in the block

View File

@@ -4421,8 +4421,14 @@
}
],
"operationCount": 10,
"triggers": [],
"triggerCount": 0,
"triggers": [
{
"id": "google_calendar_poller",
"name": "Google Calendar Event Trigger",
"description": "Triggers when events are created, updated, or cancelled in Google Calendar"
}
],
"triggerCount": 1,
"authType": "oauth",
"category": "tools",
"integrationType": "productivity",
@@ -4570,8 +4576,14 @@
}
],
"operationCount": 14,
"triggers": [],
"triggerCount": 0,
"triggers": [
{
"id": "google_drive_poller",
"name": "Google Drive File Trigger",
"description": "Triggers when files are created, modified, or deleted in Google Drive"
}
],
"triggerCount": 1,
"authType": "oauth",
"category": "tools",
"integrationType": "file-storage",
@@ -4927,8 +4939,14 @@
}
],
"operationCount": 11,
"triggers": [],
"triggerCount": 0,
"triggers": [
{
"id": "google_sheets_poller",
"name": "Google Sheets New Row Trigger",
"description": "Triggers when new rows are added to a Google Sheet"
}
],
"triggerCount": 1,
"authType": "oauth",
"category": "tools",
"integrationType": "documents",

View File

@@ -4,6 +4,7 @@ import type { BlockConfig } from '@/blocks/types'
import { AuthMode, IntegrationType } from '@/blocks/types'
import { createVersionedToolSelector, SERVICE_ACCOUNT_SUBBLOCKS } from '@/blocks/utils'
import type { GoogleCalendarResponse } from '@/tools/google_calendar/types'
import { getTrigger } from '@/triggers'
export const GoogleCalendarBlock: BlockConfig<GoogleCalendarResponse> = {
type: 'google_calendar',
@@ -488,6 +489,7 @@ Return ONLY the natural language event text - no explanations.`,
{ label: 'None (no emails sent)', id: 'none' },
],
},
...getTrigger('google_calendar_poller').subBlocks,
],
tools: {
access: [
@@ -644,6 +646,10 @@ Return ONLY the natural language event text - no explanations.`,
content: { type: 'string', description: 'Operation response content' },
metadata: { type: 'json', description: 'Event or calendar metadata' },
},
triggers: {
enabled: true,
available: ['google_calendar_poller'],
},
}
export const GoogleCalendarV2Block: BlockConfig<GoogleCalendarResponse> = {

View File

@@ -4,6 +4,7 @@ import type { BlockConfig } from '@/blocks/types'
import { AuthMode, IntegrationType } from '@/blocks/types'
import { normalizeFileInput, SERVICE_ACCOUNT_SUBBLOCKS } from '@/blocks/utils'
import type { GoogleDriveResponse } from '@/tools/google_drive/types'
import { getTrigger } from '@/triggers'
export const GoogleDriveBlock: BlockConfig<GoogleDriveResponse> = {
type: 'google_drive',
@@ -719,6 +720,7 @@ Return ONLY the message text - no subject line, no greetings/signatures, no extr
required: true,
},
// Get Drive Info has no additional fields (just needs credential)
...getTrigger('google_drive_poller').subBlocks,
],
tools: {
access: [
@@ -939,4 +941,8 @@ Return ONLY the message text - no subject line, no greetings/signatures, no extr
deleted: { type: 'boolean', description: 'Whether file was deleted' },
removed: { type: 'boolean', description: 'Whether permission was removed' },
},
triggers: {
enabled: true,
available: ['google_drive_poller'],
},
}

View File

@@ -4,6 +4,7 @@ import type { BlockConfig } from '@/blocks/types'
import { AuthMode, IntegrationType } from '@/blocks/types'
import { createVersionedToolSelector, SERVICE_ACCOUNT_SUBBLOCKS } from '@/blocks/utils'
import type { GoogleSheetsResponse, GoogleSheetsV2Response } from '@/tools/google_sheets/types'
import { getTrigger } from '@/triggers'
// Legacy block - hidden from toolbar
export const GoogleSheetsBlock: BlockConfig<GoogleSheetsResponse> = {
@@ -716,6 +717,7 @@ Return ONLY the JSON array - no explanations, no markdown, no extra text.`,
condition: { field: 'operation', value: 'copy_sheet' },
required: true,
},
...getTrigger('google_sheets_poller').subBlocks,
],
tools: {
access: [
@@ -1068,4 +1070,8 @@ Return ONLY the JSON array - no explanations, no markdown, no extra text.`,
},
},
},
triggers: {
enabled: true,
available: ['google_sheets_poller'],
},
}

View File

@@ -0,0 +1,347 @@
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
resolveOAuthCredential,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
const CALENDAR_API_BASE = 'https://www.googleapis.com/calendar/v3'
const MAX_EVENTS_PER_POLL = 50
const MAX_PAGES = 10
type CalendarEventTypeFilter = '' | 'created' | 'updated' | 'cancelled'
interface GoogleCalendarWebhookConfig {
calendarId: string
eventTypeFilter?: CalendarEventTypeFilter
searchTerm?: string
lastCheckedTimestamp?: string
maxEventsPerPoll?: number
}
interface CalendarEventAttendee {
email: string
displayName?: string
responseStatus?: string
self?: boolean
organizer?: boolean
}
interface CalendarEventPerson {
email: string
displayName?: string
self?: boolean
}
interface CalendarEventTime {
dateTime?: string
date?: string
timeZone?: string
}
interface CalendarEvent {
id: string
status: string
htmlLink?: string
created?: string
updated?: string
summary?: string
description?: string
location?: string
start?: CalendarEventTime
end?: CalendarEventTime
attendees?: CalendarEventAttendee[]
creator?: CalendarEventPerson
organizer?: CalendarEventPerson
recurringEventId?: string
}
interface SimplifiedCalendarEvent {
id: string
status: string
eventType: 'created' | 'updated' | 'cancelled'
summary: string | null
eventDescription: string | null
location: string | null
htmlLink: string | null
start: CalendarEventTime | null
end: CalendarEventTime | null
created: string | null
updated: string | null
attendees: CalendarEventAttendee[] | null
creator: CalendarEventPerson | null
organizer: CalendarEventPerson | null
}
export interface GoogleCalendarWebhookPayload {
event: SimplifiedCalendarEvent
calendarId: string
timestamp: string
}
export const googleCalendarPollingHandler: PollingProviderHandler = {
provider: 'google-calendar',
label: 'Google Calendar',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
const accessToken = await resolveOAuthCredential(
webhookData,
'google-calendar',
requestId,
logger
)
const config = webhookData.providerConfig as unknown as GoogleCalendarWebhookConfig
const calendarId = config.calendarId || 'primary'
const now = new Date()
// First poll: seed timestamp, emit nothing
if (!config.lastCheckedTimestamp) {
await updateWebhookProviderConfig(
webhookId,
{ lastCheckedTimestamp: now.toISOString() },
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] First poll for webhook ${webhookId}, seeded timestamp`)
return 'success'
}
// Fetch changed events since last poll
const events = await fetchChangedEvents(accessToken, calendarId, config, requestId, logger)
if (!events.length) {
await updateWebhookProviderConfig(
webhookId,
{ lastCheckedTimestamp: now.toISOString() },
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No changed events for webhook ${webhookId}`)
return 'success'
}
logger.info(`[${requestId}] Found ${events.length} changed events for webhook ${webhookId}`)
const { processedCount, failedCount, latestUpdated } = await processEvents(
events,
calendarId,
config.eventTypeFilter,
webhookData,
workflowData,
requestId,
logger
)
// Use the latest `updated` value from response to avoid clock skew
const newTimestamp = latestUpdated || now.toISOString()
await updateWebhookProviderConfig(webhookId, { lastCheckedTimestamp: newTimestamp }, logger)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} events failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} events for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing Google Calendar webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
async function fetchChangedEvents(
accessToken: string,
calendarId: string,
config: GoogleCalendarWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<CalendarEvent[]> {
const allEvents: CalendarEvent[] = []
const maxEvents = config.maxEventsPerPoll || MAX_EVENTS_PER_POLL
let pageToken: string | undefined
let pages = 0
do {
pages++
const params = new URLSearchParams({
updatedMin: config.lastCheckedTimestamp!,
singleEvents: 'true',
showDeleted: 'true',
orderBy: 'updated',
maxResults: String(Math.min(maxEvents, 250)),
})
if (pageToken) {
params.set('pageToken', pageToken)
}
if (config.searchTerm) {
params.set('q', config.searchTerm)
}
const encodedCalendarId = encodeURIComponent(calendarId)
const url = `${CALENDAR_API_BASE}/calendars/${encodedCalendarId}/events?${params.toString()}`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
const status = response.status
const errorData = await response.json().catch(() => ({}))
if (status === 403 || status === 429) {
throw new Error(
`Calendar API rate limit (${status}) — skipping to retry next poll cycle: ${JSON.stringify(errorData)}`
)
}
throw new Error(`Failed to fetch calendar events: ${status} - ${JSON.stringify(errorData)}`)
}
const data = await response.json()
const events = (data.items || []) as CalendarEvent[]
allEvents.push(...events)
pageToken = data.nextPageToken as string | undefined
// Stop if we have enough events or hit the page limit
if (allEvents.length >= maxEvents || pages >= MAX_PAGES) {
break
}
} while (pageToken)
return allEvents.slice(0, maxEvents)
}
function determineEventType(event: CalendarEvent): 'created' | 'updated' | 'cancelled' {
if (event.status === 'cancelled') {
return 'cancelled'
}
// If created and updated are within 5 seconds, treat as newly created
if (event.created && event.updated) {
const createdTime = new Date(event.created).getTime()
const updatedTime = new Date(event.updated).getTime()
if (Math.abs(updatedTime - createdTime) < 5000) {
return 'created'
}
}
return 'updated'
}
function simplifyEvent(
event: CalendarEvent,
eventType?: 'created' | 'updated' | 'cancelled'
): SimplifiedCalendarEvent {
return {
id: event.id,
status: event.status,
eventType: eventType ?? determineEventType(event),
summary: event.summary ?? null,
eventDescription: event.description ?? null,
location: event.location ?? null,
htmlLink: event.htmlLink ?? null,
start: event.start ?? null,
end: event.end ?? null,
created: event.created ?? null,
updated: event.updated ?? null,
attendees: event.attendees ?? null,
creator: event.creator ?? null,
organizer: event.organizer ?? null,
}
}
async function processEvents(
events: CalendarEvent[],
calendarId: string,
eventTypeFilter: CalendarEventTypeFilter | undefined,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ processedCount: number; failedCount: number; latestUpdated: string | null }> {
let processedCount = 0
let failedCount = 0
let latestUpdated: string | null = null
for (const event of events) {
// Track the latest `updated` timestamp for clock-skew-free state tracking
if (event.updated) {
if (!latestUpdated || event.updated > latestUpdated) {
latestUpdated = event.updated
}
}
// Client-side event type filter — skip before idempotency so filtered events aren't cached
const computedEventType = determineEventType(event)
if (eventTypeFilter && computedEventType !== eventTypeFilter) {
continue
}
try {
// Idempotency key includes `updated` so re-edits of the same event re-trigger
const idempotencyKey = `${webhookData.id}:${event.id}:${event.updated || event.created || ''}`
await pollingIdempotency.executeWithIdempotency(
'google-calendar',
idempotencyKey,
async () => {
const simplified = simplifyEvent(event, computedEventType)
const payload: GoogleCalendarWebhookPayload = {
event: simplified,
calendarId,
timestamp: new Date().toISOString(),
}
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
if (!result.success) {
logger.error(
`[${requestId}] Failed to process webhook for event ${event.id}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
return { eventId: event.id, processed: true }
}
)
logger.info(
`[${requestId}] Successfully processed event ${event.id} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing event ${event.id}:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount, latestUpdated }
}

View File

@@ -0,0 +1,386 @@
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
resolveOAuthCredential,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
const MAX_FILES_PER_POLL = 50
const MAX_KNOWN_FILE_IDS = 1000
const MAX_PAGES = 10
const DRIVE_API_BASE = 'https://www.googleapis.com/drive/v3'
type DriveEventTypeFilter = '' | 'created' | 'modified' | 'deleted' | 'created_or_modified'
interface GoogleDriveWebhookConfig {
folderId?: string
mimeTypeFilter?: string
includeSharedDrives?: boolean
eventTypeFilter?: DriveEventTypeFilter
maxFilesPerPoll?: number
pageToken?: string
knownFileIds?: string[]
}
interface DriveChangeEntry {
kind: string
type: string
changeType?: string
time: string
removed: boolean
fileId: string
file?: DriveFileMetadata
}
interface DriveFileMetadata {
id: string
name: string
mimeType: string
modifiedTime: string
createdTime?: string
size?: string
webViewLink?: string
parents?: string[]
lastModifyingUser?: { displayName?: string; emailAddress?: string }
shared?: boolean
starred?: boolean
trashed?: boolean
}
export interface GoogleDriveWebhookPayload {
file: DriveFileMetadata | { id: string }
eventType: 'created' | 'modified' | 'deleted'
timestamp: string
}
const FILE_FIELDS = [
'id',
'name',
'mimeType',
'modifiedTime',
'createdTime',
'size',
'webViewLink',
'parents',
'lastModifyingUser',
'shared',
'starred',
'trashed',
].join(',')
export const googleDrivePollingHandler: PollingProviderHandler = {
provider: 'google-drive',
label: 'Google Drive',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
const accessToken = await resolveOAuthCredential(
webhookData,
'google-drive',
requestId,
logger
)
const config = webhookData.providerConfig as unknown as GoogleDriveWebhookConfig
const now = new Date()
// First poll: get startPageToken and seed state
if (!config.pageToken) {
const startPageToken = await getStartPageToken(accessToken, config, requestId, logger)
await updateWebhookProviderConfig(
webhookId,
{ pageToken: startPageToken, knownFileIds: [] },
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] First poll for webhook ${webhookId}, seeded pageToken: ${startPageToken}`
)
return 'success'
}
// Fetch changes since last pageToken
const { changes, newStartPageToken } = await fetchChanges(
accessToken,
config,
requestId,
logger
)
if (!changes.length) {
await updateWebhookProviderConfig(webhookId, { pageToken: newStartPageToken }, logger)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No changes found for webhook ${webhookId}`)
return 'success'
}
// Filter changes client-side (folder, MIME type, trashed)
const filteredChanges = filterChanges(changes, config)
if (!filteredChanges.length) {
await updateWebhookProviderConfig(webhookId, { pageToken: newStartPageToken }, logger)
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] ${changes.length} changes found but none match filters for webhook ${webhookId}`
)
return 'success'
}
logger.info(
`[${requestId}] Found ${filteredChanges.length} matching changes for webhook ${webhookId}`
)
const { processedCount, failedCount, newKnownFileIds } = await processChanges(
filteredChanges,
config,
webhookData,
workflowData,
requestId,
logger
)
// Update state: new pageToken and rolling knownFileIds
const existingKnownIds = config.knownFileIds || []
const mergedKnownIds = [...new Set([...newKnownFileIds, ...existingKnownIds])].slice(
0,
MAX_KNOWN_FILE_IDS
)
await updateWebhookProviderConfig(
webhookId,
{ pageToken: newStartPageToken, knownFileIds: mergedKnownIds },
logger
)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} changes failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} changes for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing Google Drive webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
async function getStartPageToken(
accessToken: string,
config: GoogleDriveWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string> {
const params = new URLSearchParams()
if (config.includeSharedDrives) {
params.set('supportsAllDrives', 'true')
}
const url = `${DRIVE_API_BASE}/changes/startPageToken?${params.toString()}`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(
`Failed to get startPageToken: ${response.status} - ${JSON.stringify(errorData)}`
)
}
const data = await response.json()
return data.startPageToken as string
}
async function fetchChanges(
accessToken: string,
config: GoogleDriveWebhookConfig,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ changes: DriveChangeEntry[]; newStartPageToken: string }> {
const allChanges: DriveChangeEntry[] = []
let currentPageToken = config.pageToken!
let newStartPageToken = currentPageToken
const maxFiles = config.maxFilesPerPoll || MAX_FILES_PER_POLL
let pages = 0
// eslint-disable-next-line no-constant-condition
while (true) {
pages++
const params = new URLSearchParams({
pageToken: currentPageToken,
pageSize: String(Math.min(maxFiles, 100)),
fields: `nextPageToken,newStartPageToken,changes(kind,type,time,removed,fileId,file(${FILE_FIELDS}))`,
restrictToMyDrive: config.includeSharedDrives ? 'false' : 'true',
})
if (config.includeSharedDrives) {
params.set('supportsAllDrives', 'true')
params.set('includeItemsFromAllDrives', 'true')
}
const url = `${DRIVE_API_BASE}/changes?${params.toString()}`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(`Failed to fetch changes: ${response.status} - ${JSON.stringify(errorData)}`)
}
const data = await response.json()
const changes = (data.changes || []) as DriveChangeEntry[]
allChanges.push(...changes)
if (data.newStartPageToken) {
newStartPageToken = data.newStartPageToken as string
}
// Stop if no more pages or we have enough changes.
// Always use newStartPageToken (not nextPageToken) as the resume point —
// nextPageToken paginates the current query but newStartPageToken is the
// correct cursor for the next poll cycle.
if (!data.nextPageToken || allChanges.length >= maxFiles || pages >= MAX_PAGES) {
break
}
currentPageToken = data.nextPageToken as string
}
return { changes: allChanges.slice(0, maxFiles), newStartPageToken }
}
function filterChanges(
changes: DriveChangeEntry[],
config: GoogleDriveWebhookConfig
): DriveChangeEntry[] {
return changes.filter((change) => {
// Always include removals (deletions)
if (change.removed) return true
const file = change.file
if (!file) return false
// Exclude trashed files
if (file.trashed) return false
// Folder filter: check if file is in the specified folder
if (config.folderId) {
if (!file.parents || !file.parents.includes(config.folderId)) {
return false
}
}
// MIME type filter
if (config.mimeTypeFilter) {
// Support prefix matching (e.g., "image/" matches "image/png", "image/jpeg")
if (config.mimeTypeFilter.endsWith('/')) {
if (!file.mimeType.startsWith(config.mimeTypeFilter)) {
return false
}
} else if (file.mimeType !== config.mimeTypeFilter) {
return false
}
}
return true
})
}
async function processChanges(
changes: DriveChangeEntry[],
config: GoogleDriveWebhookConfig,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ processedCount: number; failedCount: number; newKnownFileIds: string[] }> {
let processedCount = 0
let failedCount = 0
const newKnownFileIds: string[] = []
const knownFileIdsSet = new Set(config.knownFileIds || [])
for (const change of changes) {
// Determine event type before idempotency to avoid caching filter decisions
let eventType: 'created' | 'modified' | 'deleted'
if (change.removed) {
eventType = 'deleted'
} else if (!knownFileIdsSet.has(change.fileId)) {
eventType = 'created'
} else {
eventType = 'modified'
}
// Track file as known regardless of filter (for future create/modify distinction)
if (!change.removed) {
newKnownFileIds.push(change.fileId)
}
// Client-side event type filter — skip before idempotency so filtered events aren't cached
const filter = config.eventTypeFilter
if (filter) {
const skip = filter === 'created_or_modified' ? eventType === 'deleted' : eventType !== filter
if (skip) continue
}
try {
const idempotencyKey = `${webhookData.id}:${change.fileId}:${change.time}`
await pollingIdempotency.executeWithIdempotency('google-drive', idempotencyKey, async () => {
const payload: GoogleDriveWebhookPayload = {
file: change.file || { id: change.fileId },
eventType,
timestamp: new Date().toISOString(),
}
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
if (!result.success) {
logger.error(
`[${requestId}] Failed to process webhook for file ${change.fileId}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
return { fileId: change.fileId, processed: true }
})
logger.info(
`[${requestId}] Successfully processed change for file ${change.fileId} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(
`[${requestId}] Error processing change for file ${change.fileId}:`,
errorMessage
)
failedCount++
}
}
return { processedCount, failedCount, newKnownFileIds }
}

View File

@@ -0,0 +1,435 @@
import { pollingIdempotency } from '@/lib/core/idempotency/service'
import type { PollingProviderHandler, PollWebhookContext } from '@/lib/webhooks/polling/types'
import {
markWebhookFailed,
markWebhookSuccess,
resolveOAuthCredential,
updateWebhookProviderConfig,
} from '@/lib/webhooks/polling/utils'
import { processPolledWebhookEvent } from '@/lib/webhooks/processor'
const MAX_ROWS_PER_POLL = 100
type ValueRenderOption = 'FORMATTED_VALUE' | 'UNFORMATTED_VALUE' | 'FORMULA'
type DateTimeRenderOption = 'SERIAL_NUMBER' | 'FORMATTED_STRING'
interface GoogleSheetsWebhookConfig {
spreadsheetId: string
sheetName: string
includeHeaders: boolean
valueRenderOption?: ValueRenderOption
dateTimeRenderOption?: DateTimeRenderOption
lastKnownRowCount?: number
lastModifiedTime?: string
lastCheckedTimestamp?: string
maxRowsPerPoll?: number
}
export interface GoogleSheetsWebhookPayload {
row: Record<string, string> | null
rawRow: string[]
headers: string[]
rowNumber: number
spreadsheetId: string
sheetName: string
timestamp: string
}
export const googleSheetsPollingHandler: PollingProviderHandler = {
provider: 'google-sheets',
label: 'Google Sheets',
async pollWebhook(ctx: PollWebhookContext): Promise<'success' | 'failure'> {
const { webhookData, workflowData, requestId, logger } = ctx
const webhookId = webhookData.id
try {
const accessToken = await resolveOAuthCredential(
webhookData,
'google-sheets',
requestId,
logger
)
const config = webhookData.providerConfig as unknown as GoogleSheetsWebhookConfig
const now = new Date()
if (!config?.spreadsheetId || !config?.sheetName) {
logger.error(`[${requestId}] Missing spreadsheetId or sheetName for webhook ${webhookId}`)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
// Pre-check: use Drive API to see if the file was modified since last poll
const skipPoll = await isDriveFileUnchanged(
accessToken,
config.spreadsheetId,
config.lastModifiedTime,
requestId,
logger
)
if (skipPoll) {
await updateWebhookProviderConfig(
webhookId,
{ lastCheckedTimestamp: now.toISOString() },
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] Sheet not modified since last poll for webhook ${webhookId}`)
return 'success'
}
// Get current Drive modifiedTime for state update
const currentModifiedTime = await getDriveFileModifiedTime(
accessToken,
config.spreadsheetId,
logger
)
// Fetch current row count via column A
const currentRowCount = await getDataRowCount(
accessToken,
config.spreadsheetId,
config.sheetName,
requestId,
logger
)
// First poll: seed state, emit nothing
if (config.lastKnownRowCount === undefined) {
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: currentRowCount,
lastModifiedTime: currentModifiedTime,
lastCheckedTimestamp: now.toISOString(),
},
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] First poll for webhook ${webhookId}, seeded row count: ${currentRowCount}`
)
return 'success'
}
// Rows deleted or unchanged
if (currentRowCount <= config.lastKnownRowCount) {
if (currentRowCount < config.lastKnownRowCount) {
logger.warn(
`[${requestId}] Row count decreased from ${config.lastKnownRowCount} to ${currentRowCount} for webhook ${webhookId}`
)
}
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: currentRowCount,
lastModifiedTime: currentModifiedTime,
lastCheckedTimestamp: now.toISOString(),
},
logger
)
await markWebhookSuccess(webhookId, logger)
logger.info(`[${requestId}] No new rows for webhook ${webhookId}`)
return 'success'
}
// New rows detected
const newRowCount = currentRowCount - config.lastKnownRowCount
const maxRows = config.maxRowsPerPoll || MAX_ROWS_PER_POLL
const rowsToFetch = Math.min(newRowCount, maxRows)
const startRow = config.lastKnownRowCount + 1
const endRow = config.lastKnownRowCount + rowsToFetch
logger.info(
`[${requestId}] Found ${newRowCount} new rows for webhook ${webhookId}, processing rows ${startRow}-${endRow}`
)
// Resolve render options
const valueRender = config.valueRenderOption || 'FORMATTED_VALUE'
const dateTimeRender = config.dateTimeRenderOption || 'SERIAL_NUMBER'
// Fetch headers (row 1) if includeHeaders is enabled
let headers: string[] = []
if (config.includeHeaders !== false) {
headers = await fetchHeaderRow(
accessToken,
config.spreadsheetId,
config.sheetName,
valueRender,
dateTimeRender,
requestId,
logger
)
}
// Fetch new rows — startRow/endRow are already 1-indexed sheet row numbers
// because lastKnownRowCount includes the header row
const newRows = await fetchRowRange(
accessToken,
config.spreadsheetId,
config.sheetName,
startRow,
endRow,
valueRender,
dateTimeRender,
requestId,
logger
)
const { processedCount, failedCount } = await processRows(
newRows,
headers,
startRow,
config,
webhookData,
workflowData,
requestId,
logger
)
// Update state: advance row count by the number we fetched (not total new rows)
// so remaining rows are picked up in the next poll.
// When batching (more rows than maxRowsPerPoll), keep the old lastModifiedTime
// so the Drive pre-check doesn't skip remaining rows on the next poll.
const newLastKnownRowCount = config.lastKnownRowCount + rowsToFetch
const hasRemainingRows = rowsToFetch < newRowCount
await updateWebhookProviderConfig(
webhookId,
{
lastKnownRowCount: newLastKnownRowCount,
lastModifiedTime: hasRemainingRows ? config.lastModifiedTime : currentModifiedTime,
lastCheckedTimestamp: now.toISOString(),
},
logger
)
if (failedCount > 0 && processedCount === 0) {
await markWebhookFailed(webhookId, logger)
logger.warn(
`[${requestId}] All ${failedCount} rows failed to process for webhook ${webhookId}`
)
return 'failure'
}
await markWebhookSuccess(webhookId, logger)
logger.info(
`[${requestId}] Successfully processed ${processedCount} rows for webhook ${webhookId}${failedCount > 0 ? ` (${failedCount} failed)` : ''}`
)
return 'success'
} catch (error) {
logger.error(`[${requestId}] Error processing Google Sheets webhook ${webhookId}:`, error)
await markWebhookFailed(webhookId, logger)
return 'failure'
}
},
}
async function isDriveFileUnchanged(
accessToken: string,
spreadsheetId: string,
lastModifiedTime: string | undefined,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<boolean> {
if (!lastModifiedTime) return false
try {
const currentModifiedTime = await getDriveFileModifiedTime(accessToken, spreadsheetId, logger)
return currentModifiedTime === lastModifiedTime
} catch (error) {
// If Drive check fails, proceed with Sheets API (don't skip)
logger.warn(`[${requestId}] Drive modifiedTime check failed, proceeding with Sheets API`)
return false
}
}
async function getDriveFileModifiedTime(
accessToken: string,
fileId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string | undefined> {
try {
const response = await fetch(
`https://www.googleapis.com/drive/v3/files/${fileId}?fields=modifiedTime`,
{ headers: { Authorization: `Bearer ${accessToken}` } }
)
if (!response.ok) return undefined
const data = await response.json()
return data.modifiedTime as string | undefined
} catch {
return undefined
}
}
async function getDataRowCount(
accessToken: string,
spreadsheetId: string,
sheetName: string,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<number> {
const encodedSheet = encodeURIComponent(sheetName)
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!A:A?majorDimension=COLUMNS&fields=values`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(
`Failed to fetch row count: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}`
)
}
const data = await response.json()
// values is [[cell1, cell2, ...]] when majorDimension=COLUMNS
const columnValues = data.values?.[0] as string[] | undefined
return columnValues?.length ?? 0
}
async function fetchHeaderRow(
accessToken: string,
spreadsheetId: string,
sheetName: string,
valueRenderOption: ValueRenderOption,
dateTimeRenderOption: DateTimeRenderOption,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string[]> {
const encodedSheet = encodeURIComponent(sheetName)
const params = new URLSearchParams({
fields: 'values',
valueRenderOption,
dateTimeRenderOption,
})
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!1:1?${params.toString()}`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
logger.warn(`[${requestId}] Failed to fetch header row, proceeding without headers`)
return []
}
const data = await response.json()
return (data.values?.[0] as string[]) ?? []
}
async function fetchRowRange(
accessToken: string,
spreadsheetId: string,
sheetName: string,
startRow: number,
endRow: number,
valueRenderOption: ValueRenderOption,
dateTimeRenderOption: DateTimeRenderOption,
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<string[][]> {
const encodedSheet = encodeURIComponent(sheetName)
const params = new URLSearchParams({
fields: 'values',
valueRenderOption,
dateTimeRenderOption,
})
const url = `https://sheets.googleapis.com/v4/spreadsheets/${spreadsheetId}/values/${encodedSheet}!${startRow}:${endRow}?${params.toString()}`
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` },
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(
`Failed to fetch rows ${startRow}-${endRow}: ${response.status} ${response.statusText} - ${JSON.stringify(errorData)}`
)
}
const data = await response.json()
return (data.values as string[][]) ?? []
}
async function processRows(
rows: string[][],
headers: string[],
startRowIndex: number,
config: GoogleSheetsWebhookConfig,
webhookData: PollWebhookContext['webhookData'],
workflowData: PollWebhookContext['workflowData'],
requestId: string,
logger: ReturnType<typeof import('@sim/logger').createLogger>
): Promise<{ processedCount: number; failedCount: number }> {
let processedCount = 0
let failedCount = 0
for (let i = 0; i < rows.length; i++) {
const row = rows[i]
const rowNumber = startRowIndex + i // startRowIndex is already the 1-indexed sheet row
try {
await pollingIdempotency.executeWithIdempotency(
'google-sheets',
`${webhookData.id}:${config.spreadsheetId}:${config.sheetName}:row${rowNumber}:${row.join('|')}`,
async () => {
// Map row values to headers
let mappedRow: Record<string, string> | null = null
if (headers.length > 0 && config.includeHeaders !== false) {
mappedRow = {}
for (let j = 0; j < headers.length; j++) {
const header = headers[j] || `Column ${j + 1}`
mappedRow[header] = row[j] ?? ''
}
// Include any extra columns beyond headers
for (let j = headers.length; j < row.length; j++) {
mappedRow[`Column ${j + 1}`] = row[j] ?? ''
}
}
const payload: GoogleSheetsWebhookPayload = {
row: mappedRow,
rawRow: row,
headers,
rowNumber,
spreadsheetId: config.spreadsheetId,
sheetName: config.sheetName,
timestamp: new Date().toISOString(),
}
const result = await processPolledWebhookEvent(
webhookData,
workflowData,
payload,
requestId
)
if (!result.success) {
logger.error(
`[${requestId}] Failed to process webhook for row ${rowNumber}:`,
result.statusCode,
result.error
)
throw new Error(`Webhook processing failed (${result.statusCode}): ${result.error}`)
}
return { rowNumber, processed: true }
}
)
logger.info(
`[${requestId}] Successfully processed row ${rowNumber} for webhook ${webhookData.id}`
)
processedCount++
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Error processing row ${rowNumber}:`, errorMessage)
failedCount++
}
}
return { processedCount, failedCount }
}

View File

@@ -1,4 +1,7 @@
import { gmailPollingHandler } from '@/lib/webhooks/polling/gmail'
import { googleCalendarPollingHandler } from '@/lib/webhooks/polling/google-calendar'
import { googleDrivePollingHandler } from '@/lib/webhooks/polling/google-drive'
import { googleSheetsPollingHandler } from '@/lib/webhooks/polling/google-sheets'
import { imapPollingHandler } from '@/lib/webhooks/polling/imap'
import { outlookPollingHandler } from '@/lib/webhooks/polling/outlook'
import { rssPollingHandler } from '@/lib/webhooks/polling/rss'
@@ -6,6 +9,9 @@ import type { PollingProviderHandler } from '@/lib/webhooks/polling/types'
const POLLING_HANDLERS: Record<string, PollingProviderHandler> = {
gmail: gmailPollingHandler,
'google-calendar': googleCalendarPollingHandler,
'google-drive': googleDrivePollingHandler,
'google-sheets': googleSheetsPollingHandler,
imap: imapPollingHandler,
outlook: outlookPollingHandler,
rss: rssPollingHandler,

View File

@@ -42,7 +42,15 @@ export const MAX_CONSECUTIVE_FAILURES = 100
* Used to route execution: polling providers use the full job queue
* (Trigger.dev), non-polling providers execute inline.
*/
export const POLLING_PROVIDERS = new Set(['gmail', 'outlook', 'rss', 'imap'])
export const POLLING_PROVIDERS = new Set([
'gmail',
'google-calendar',
'google-drive',
'google-sheets',
'imap',
'outlook',
'rss',
])
export function isPollingWebhookProvider(provider: string): boolean {
return POLLING_PROVIDERS.has(provider)

View File

@@ -0,0 +1 @@
export { googleCalendarPollingTrigger } from './poller'

View File

@@ -0,0 +1,200 @@
import { createLogger } from '@sim/logger'
import { GoogleCalendarIcon } from '@/components/icons'
import { isCredentialSetValue } from '@/executor/constants'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import type { TriggerConfig } from '@/triggers/types'
const logger = createLogger('GoogleCalendarPollingTrigger')
const DEFAULT_CALENDARS = [{ id: 'primary', label: 'Primary Calendar' }]
export const googleCalendarPollingTrigger: TriggerConfig = {
id: 'google_calendar_poller',
name: 'Google Calendar Event Trigger',
provider: 'google-calendar',
description: 'Triggers when events are created, updated, or cancelled in Google Calendar',
version: '1.0.0',
icon: GoogleCalendarIcon,
polling: true,
subBlocks: [
{
id: 'triggerCredentials',
title: 'Credentials',
type: 'oauth-input',
description: 'Connect your Google account to access Google Calendar.',
serviceId: 'google-calendar',
requiredScopes: [],
required: true,
mode: 'trigger',
supportsCredentialSets: true,
},
{
id: 'calendarId',
title: 'Calendar',
type: 'dropdown',
placeholder: 'Select a calendar',
description: 'The calendar to monitor for event changes.',
required: false,
defaultValue: 'primary',
options: [],
fetchOptions: async (blockId: string) => {
const credentialId = useSubBlockStore.getState().getValue(blockId, 'triggerCredentials') as
| string
| null
if (!credentialId) {
throw new Error('No Google Calendar credential selected')
}
// Credential sets can't fetch user-specific calendars
if (isCredentialSetValue(credentialId)) {
return DEFAULT_CALENDARS
}
try {
const response = await fetch(
`/api/tools/google_calendar/calendars?credentialId=${credentialId}`
)
if (!response.ok) {
throw new Error('Failed to fetch calendars')
}
const data = await response.json()
if (data.calendars && Array.isArray(data.calendars)) {
return data.calendars.map(
(calendar: { id: string; summary: string; primary: boolean }) => ({
id: calendar.id,
label: calendar.primary ? `${calendar.summary} (Primary)` : calendar.summary,
})
)
}
return DEFAULT_CALENDARS
} catch (error) {
logger.error('Error fetching calendars:', error)
throw error
}
},
dependsOn: ['triggerCredentials'],
mode: 'trigger',
},
{
id: 'eventTypeFilter',
title: 'Event Type',
type: 'dropdown',
options: [
{ id: '', label: 'All Events' },
{ id: 'created', label: 'Created' },
{ id: 'updated', label: 'Updated' },
{ id: 'cancelled', label: 'Cancelled' },
],
defaultValue: '',
description: 'Only trigger for specific event types. Defaults to all events.',
required: false,
mode: 'trigger',
},
{
id: 'searchTerm',
title: 'Search Term',
type: 'short-input',
placeholder: 'e.g., team meeting, standup',
description:
'Optional: Filter events by text match across title, description, location, and attendees.',
required: false,
mode: 'trigger',
},
{
id: 'triggerSave',
title: '',
type: 'trigger-save',
hideFromPreview: true,
mode: 'trigger',
triggerId: 'google_calendar_poller',
},
{
id: 'triggerInstructions',
title: 'Setup Instructions',
hideFromPreview: true,
type: 'text',
defaultValue: [
'Connect your Google account using OAuth credentials',
'Select the calendar to monitor (defaults to your primary calendar)',
'The system will automatically detect new, updated, and cancelled events',
]
.map(
(instruction, index) =>
`<div class="mb-3"><strong>${index + 1}.</strong> ${instruction}</div>`
)
.join(''),
mode: 'trigger',
},
],
outputs: {
event: {
id: {
type: 'string',
description: 'Calendar event ID',
},
status: {
type: 'string',
description: 'Event status (confirmed, tentative, cancelled)',
},
eventType: {
type: 'string',
description: 'Change type: "created", "updated", or "cancelled"',
},
summary: {
type: 'string',
description: 'Event title',
},
eventDescription: {
type: 'string',
description: 'Event description',
},
location: {
type: 'string',
description: 'Event location',
},
htmlLink: {
type: 'string',
description: 'Link to event in Google Calendar',
},
start: {
type: 'json',
description: 'Event start time',
},
end: {
type: 'json',
description: 'Event end time',
},
created: {
type: 'string',
description: 'Event creation time',
},
updated: {
type: 'string',
description: 'Event last updated time',
},
attendees: {
type: 'json',
description: 'Event attendees',
},
creator: {
type: 'json',
description: 'Event creator',
},
organizer: {
type: 'json',
description: 'Event organizer',
},
},
calendarId: {
type: 'string',
description: 'Calendar ID',
},
timestamp: {
type: 'string',
description: 'Event processing timestamp in ISO format',
},
},
}

View File

@@ -0,0 +1 @@
export { googleSheetsPollingTrigger } from './poller'

View File

@@ -0,0 +1,185 @@
import { createLogger } from '@sim/logger'
import { GoogleSheetsIcon } from '@/components/icons'
import { isCredentialSetValue } from '@/executor/constants'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import type { TriggerConfig } from '@/triggers/types'
const logger = createLogger('GoogleSheetsPollingTrigger')
export const googleSheetsPollingTrigger: TriggerConfig = {
id: 'google_sheets_poller',
name: 'Google Sheets New Row Trigger',
provider: 'google-sheets',
description: 'Triggers when new rows are added to a Google Sheet',
version: '1.0.0',
icon: GoogleSheetsIcon,
polling: true,
subBlocks: [
{
id: 'triggerCredentials',
title: 'Credentials',
type: 'oauth-input',
description: 'Connect your Google account to access Google Sheets.',
serviceId: 'google-sheets',
requiredScopes: [],
required: true,
mode: 'trigger',
supportsCredentialSets: true,
},
{
id: 'spreadsheetId',
title: 'Spreadsheet ID',
type: 'short-input',
placeholder: 'e.g., 1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgVE2upms',
description:
'The spreadsheet ID from the URL: docs.google.com/spreadsheets/d/{spreadsheetId}/edit',
required: true,
mode: 'trigger',
},
{
id: 'sheetName',
title: 'Sheet Tab',
type: 'dropdown',
placeholder: 'Select a sheet tab',
description: 'The sheet tab to monitor for new rows.',
required: true,
options: [],
fetchOptions: async (blockId: string) => {
const subBlockStore = useSubBlockStore.getState()
const credentialId = subBlockStore.getValue(blockId, 'triggerCredentials') as string | null
const spreadsheetId = subBlockStore.getValue(blockId, 'spreadsheetId') as string | null
if (!credentialId) {
throw new Error('No Google Sheets credential selected')
}
if (!spreadsheetId) {
throw new Error('No spreadsheet ID provided')
}
// Credential sets can't fetch user-specific data; return empty to allow manual entry
if (isCredentialSetValue(credentialId)) {
return []
}
try {
const response = await fetch(
`/api/tools/google_sheets/sheets?credentialId=${credentialId}&spreadsheetId=${spreadsheetId}`
)
if (!response.ok) {
throw new Error('Failed to fetch sheet tabs')
}
const data = await response.json()
if (data.sheets && Array.isArray(data.sheets)) {
return data.sheets.map((sheet: { id: string; name: string }) => ({
id: sheet.id,
label: sheet.name,
}))
}
return []
} catch (error) {
logger.error('Error fetching sheet tabs:', error)
throw error
}
},
dependsOn: ['triggerCredentials', 'spreadsheetId'],
mode: 'trigger',
},
{
id: 'includeHeaders',
title: 'Map Row Values to Headers',
type: 'switch',
defaultValue: true,
description:
'When enabled, each row is returned as a key-value object mapped to column headers from row 1.',
required: false,
mode: 'trigger',
},
{
id: 'valueRenderOption',
title: 'Value Render',
type: 'dropdown',
options: [
{ id: 'FORMATTED_VALUE', label: 'Formatted Value' },
{ id: 'UNFORMATTED_VALUE', label: 'Unformatted Value' },
{ id: 'FORMULA', label: 'Formula' },
],
defaultValue: 'FORMATTED_VALUE',
description:
'How values are rendered. Formatted returns display strings, Unformatted returns raw numbers/booleans, Formula returns the formula text.',
required: false,
mode: 'trigger',
},
{
id: 'dateTimeRenderOption',
title: 'Date/Time Render',
type: 'dropdown',
options: [
{ id: 'SERIAL_NUMBER', label: 'Serial Number' },
{ id: 'FORMATTED_STRING', label: 'Formatted String' },
],
defaultValue: 'SERIAL_NUMBER',
description:
'How dates and times are rendered. Only applies when Value Render is not "Formatted Value".',
required: false,
mode: 'trigger',
},
{
id: 'triggerSave',
title: '',
type: 'trigger-save',
hideFromPreview: true,
mode: 'trigger',
triggerId: 'google_sheets_poller',
},
{
id: 'triggerInstructions',
title: 'Setup Instructions',
hideFromPreview: true,
type: 'text',
defaultValue: [
'Connect your Google account using OAuth credentials',
'Enter the Spreadsheet ID from your Google Sheets URL',
'Select the sheet tab to monitor',
'The system will automatically detect new rows appended to the sheet',
]
.map(
(instruction, index) =>
`<div class="mb-3"><strong>${index + 1}.</strong> ${instruction}</div>`
)
.join(''),
mode: 'trigger',
},
],
outputs: {
row: {
type: 'json',
description: 'Row data mapped to column headers (when header mapping is enabled)',
},
rawRow: {
type: 'json',
description: 'Raw row values as an array',
},
headers: {
type: 'json',
description: 'Column headers from row 1',
},
rowNumber: {
type: 'number',
description: 'The 1-based row number of the new row',
},
spreadsheetId: {
type: 'string',
description: 'The spreadsheet ID',
},
sheetName: {
type: 'string',
description: 'The sheet tab name',
},
timestamp: {
type: 'string',
description: 'Event timestamp in ISO format',
},
},
}

View File

@@ -0,0 +1 @@
export { googleDrivePollingTrigger } from './poller'

View File

@@ -0,0 +1,167 @@
import { createLogger } from '@sim/logger'
import { GoogleDriveIcon } from '@/components/icons'
import type { TriggerConfig } from '@/triggers/types'
const logger = createLogger('GoogleDrivePollingTrigger')
const MIME_TYPE_OPTIONS = [
{ id: '', label: 'All Files' },
{ id: 'application/vnd.google-apps.document', label: 'Google Docs' },
{ id: 'application/vnd.google-apps.spreadsheet', label: 'Google Sheets' },
{ id: 'application/vnd.google-apps.presentation', label: 'Google Slides' },
{ id: 'application/pdf', label: 'PDFs' },
{ id: 'image/', label: 'Images' },
{ id: 'application/vnd.google-apps.folder', label: 'Folders' },
] as const
export const googleDrivePollingTrigger: TriggerConfig = {
id: 'google_drive_poller',
name: 'Google Drive File Trigger',
provider: 'google-drive',
description: 'Triggers when files are created, modified, or deleted in Google Drive',
version: '1.0.0',
icon: GoogleDriveIcon,
polling: true,
subBlocks: [
{
id: 'triggerCredentials',
title: 'Credentials',
type: 'oauth-input',
description: 'Connect your Google account to access Google Drive.',
serviceId: 'google-drive',
requiredScopes: [],
required: true,
mode: 'trigger',
supportsCredentialSets: true,
},
{
id: 'folderId',
title: 'Folder ID',
type: 'short-input',
placeholder: 'Leave empty to monitor entire Drive',
description:
'Optional: The folder ID from the Google Drive URL to monitor. Leave empty to monitor all files.',
required: false,
mode: 'trigger',
},
{
id: 'mimeTypeFilter',
title: 'File Type Filter',
type: 'dropdown',
options: [...MIME_TYPE_OPTIONS],
defaultValue: '',
description: 'Optional: Only trigger for specific file types.',
required: false,
mode: 'trigger',
},
{
id: 'eventTypeFilter',
title: 'Event Type',
type: 'dropdown',
options: [
{ id: '', label: 'All Changes' },
{ id: 'created', label: 'File Created' },
{ id: 'modified', label: 'File Modified' },
{ id: 'deleted', label: 'File Deleted' },
{ id: 'created_or_modified', label: 'Created or Modified' },
],
defaultValue: '',
description: 'Only trigger for specific change types. Defaults to all changes.',
required: false,
mode: 'trigger',
},
{
id: 'includeSharedDrives',
title: 'Include Shared Drives',
type: 'switch',
defaultValue: false,
description: 'Include files from shared (team) drives.',
required: false,
mode: 'trigger',
},
{
id: 'triggerSave',
title: '',
type: 'trigger-save',
hideFromPreview: true,
mode: 'trigger',
triggerId: 'google_drive_poller',
},
{
id: 'triggerInstructions',
title: 'Setup Instructions',
hideFromPreview: true,
type: 'text',
defaultValue: [
'Connect your Google account using OAuth credentials',
'Optionally specify a folder ID to monitor a specific folder',
'Optionally filter by file type',
'The system will automatically detect new, modified, and deleted files',
]
.map(
(instruction, index) =>
`<div class="mb-3"><strong>${index + 1}.</strong> ${instruction}</div>`
)
.join(''),
mode: 'trigger',
},
],
outputs: {
file: {
id: {
type: 'string',
description: 'Google Drive file ID',
},
name: {
type: 'string',
description: 'File name',
},
mimeType: {
type: 'string',
description: 'File MIME type',
},
modifiedTime: {
type: 'string',
description: 'Last modified time (ISO)',
},
createdTime: {
type: 'string',
description: 'File creation time (ISO)',
},
size: {
type: 'string',
description: 'File size in bytes',
},
webViewLink: {
type: 'string',
description: 'URL to view file in browser',
},
parents: {
type: 'json',
description: 'Parent folder IDs',
},
lastModifyingUser: {
type: 'json',
description: 'User who last modified the file',
},
shared: {
type: 'boolean',
description: 'Whether file is shared',
},
starred: {
type: 'boolean',
description: 'Whether file is starred',
},
},
eventType: {
type: 'string',
description: 'Change type: "created", "modified", or "deleted"',
},
timestamp: {
type: 'string',
description: 'Event timestamp in ISO format',
},
},
}

View File

@@ -90,6 +90,9 @@ import {
} from '@/triggers/github'
import { gmailPollingTrigger } from '@/triggers/gmail'
import { gongCallCompletedTrigger, gongWebhookTrigger } from '@/triggers/gong'
import { googleDrivePollingTrigger } from '@/triggers/google_drive'
import { googleCalendarPollingTrigger } from '@/triggers/google-calendar'
import { googleSheetsPollingTrigger } from '@/triggers/google-sheets'
import { googleFormsWebhookTrigger } from '@/triggers/googleforms'
import {
grainHighlightCreatedTrigger,
@@ -359,6 +362,9 @@ export const TRIGGER_REGISTRY: TriggerRegistry = {
fathom_new_meeting: fathomNewMeetingTrigger,
fathom_webhook: fathomWebhookTrigger,
gmail_poller: gmailPollingTrigger,
google_calendar_poller: googleCalendarPollingTrigger,
google_drive_poller: googleDrivePollingTrigger,
google_sheets_poller: googleSheetsPollingTrigger,
gong_call_completed: gongCallCompletedTrigger,
gong_webhook: gongWebhookTrigger,
grain_webhook: grainWebhookTrigger,

View File

@@ -968,6 +968,33 @@ cronjobs:
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
googleSheetsWebhookPoll:
enabled: true
name: google-sheets-webhook-poll
schedule: "*/1 * * * *"
path: "/api/webhooks/poll/google-sheets"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
googleDriveWebhookPoll:
enabled: true
name: google-drive-webhook-poll
schedule: "*/1 * * * *"
path: "/api/webhooks/poll/google-drive"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
googleCalendarWebhookPoll:
enabled: true
name: google-calendar-webhook-poll
schedule: "*/1 * * * *"
path: "/api/webhooks/poll/google-calendar"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
renewSubscriptions:
enabled: true
name: renew-subscriptions