mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-08 22:48:14 -05:00
feat(triggers): added rss feed trigger & poller (#2267)
This commit is contained in:
@@ -4170,3 +4170,32 @@ export function DuckDuckGoIcon(props: SVGProps<SVGSVGElement>) {
|
||||
</svg>
|
||||
)
|
||||
}
|
||||
|
||||
export function RssIcon(props: SVGProps<SVGSVGElement>) {
|
||||
return (
|
||||
<svg
|
||||
{...props}
|
||||
width='24'
|
||||
height='24'
|
||||
viewBox='0 0 24 24'
|
||||
fill='none'
|
||||
xmlns='http://www.w3.org/2000/svg'
|
||||
>
|
||||
<path
|
||||
d='M4 11C6.38695 11 8.67613 11.9482 10.364 13.636C12.0518 15.3239 13 17.6131 13 20'
|
||||
stroke='currentColor'
|
||||
strokeWidth='2'
|
||||
strokeLinecap='round'
|
||||
strokeLinejoin='round'
|
||||
/>
|
||||
<path
|
||||
d='M4 4C8.24346 4 12.3131 5.68571 15.3137 8.68629C18.3143 11.6869 20 15.7565 20 20'
|
||||
stroke='currentColor'
|
||||
strokeWidth='2'
|
||||
strokeLinecap='round'
|
||||
strokeLinejoin='round'
|
||||
/>
|
||||
<circle cx='5' cy='19' r='1' fill='currentColor' />
|
||||
</svg>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,9 @@ Use the Start block for everything originating from the editor, deploy-to-API, o
|
||||
<Card title="Schedule" href="/triggers/schedule">
|
||||
Cron or interval based execution
|
||||
</Card>
|
||||
<Card title="RSS Feed" href="/triggers/rss">
|
||||
Monitor RSS and Atom feeds for new content
|
||||
</Card>
|
||||
</Cards>
|
||||
|
||||
## Quick Comparison
|
||||
@@ -39,6 +42,7 @@ Use the Start block for everything originating from the editor, deploy-to-API, o
|
||||
| **Start** | Editor runs, deploy-to-API requests, or chat messages |
|
||||
| **Schedule** | Timer managed in schedule block |
|
||||
| **Webhook** | On inbound HTTP request |
|
||||
| **RSS Feed** | New item published to feed |
|
||||
|
||||
> The Start block always exposes `input`, `conversationId`, and `files` fields. Add custom fields to the input format for additional structured data.
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
{
|
||||
"pages": ["index", "start", "schedule", "webhook"]
|
||||
"pages": ["index", "start", "schedule", "webhook", "rss"]
|
||||
}
|
||||
|
||||
49
apps/docs/content/docs/en/triggers/rss.mdx
Normal file
49
apps/docs/content/docs/en/triggers/rss.mdx
Normal file
@@ -0,0 +1,49 @@
|
||||
---
|
||||
title: RSS Feed
|
||||
---
|
||||
|
||||
import { Callout } from 'fumadocs-ui/components/callout'
|
||||
import { Image } from '@/components/ui/image'
|
||||
|
||||
The RSS Feed block monitors RSS and Atom feeds – when new items are published, your workflow triggers automatically.
|
||||
|
||||
<div className="flex justify-center">
|
||||
<Image
|
||||
src="/static/blocks/rss.png"
|
||||
alt="RSS Feed Block"
|
||||
width={500}
|
||||
height={400}
|
||||
className="my-6"
|
||||
/>
|
||||
</div>
|
||||
|
||||
## Configuration
|
||||
|
||||
1. **Add RSS Feed Block** - Drag the RSS Feed block to start your workflow
|
||||
2. **Enter Feed URL** - Paste the URL of any RSS or Atom feed
|
||||
3. **Deploy** - Deploy your workflow to activate polling
|
||||
|
||||
Once deployed, the feed is checked every minute for new items.
|
||||
|
||||
## Output Fields
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `title` | string | Item title |
|
||||
| `link` | string | Item link |
|
||||
| `pubDate` | string | Publication date |
|
||||
| `item` | object | Raw item with all fields |
|
||||
| `feed` | object | Raw feed metadata |
|
||||
|
||||
Access mapped fields directly (`<rss.title>`) or use the raw objects for any field (`<rss.item.author>`, `<rss.feed.language>`).
|
||||
|
||||
## Use Cases
|
||||
|
||||
- **Content monitoring** - Track blogs, news sites, or competitor updates
|
||||
- **Podcast automation** - Trigger workflows when new episodes drop
|
||||
- **Release tracking** - Monitor GitHub releases, changelogs, or product updates
|
||||
- **Social aggregation** - Collect content from platforms that expose RSS feeds
|
||||
|
||||
<Callout>
|
||||
RSS triggers only fire for items published after you save the trigger. Existing feed items are not processed.
|
||||
</Callout>
|
||||
BIN
apps/docs/public/static/blocks/rss.png
Normal file
BIN
apps/docs/public/static/blocks/rss.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 19 KiB |
66
apps/sim/app/api/webhooks/poll/rss/route.ts
Normal file
66
apps/sim/app/api/webhooks/poll/rss/route.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { nanoid } from 'nanoid'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { pollRssWebhooks } from '@/lib/webhooks/rss-polling-service'
|
||||
|
||||
const logger = createLogger('RssPollingAPI')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const maxDuration = 180 // Allow up to 3 minutes for polling to complete
|
||||
|
||||
const LOCK_KEY = 'rss-polling-lock'
|
||||
const LOCK_TTL_SECONDS = 180 // Same as maxDuration (3 min)
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const requestId = nanoid()
|
||||
logger.info(`RSS webhook polling triggered (${requestId})`)
|
||||
|
||||
let lockValue: string | undefined
|
||||
|
||||
try {
|
||||
const authError = verifyCronAuth(request, 'RSS webhook polling')
|
||||
if (authError) {
|
||||
return authError
|
||||
}
|
||||
|
||||
lockValue = requestId
|
||||
const locked = await acquireLock(LOCK_KEY, lockValue, LOCK_TTL_SECONDS)
|
||||
|
||||
if (!locked) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: true,
|
||||
message: 'Polling already in progress – skipped',
|
||||
requestId,
|
||||
status: 'skip',
|
||||
},
|
||||
{ status: 202 }
|
||||
)
|
||||
}
|
||||
|
||||
const results = await pollRssWebhooks()
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
message: 'RSS polling completed',
|
||||
requestId,
|
||||
status: 'completed',
|
||||
...results,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`Error during RSS polling (${requestId}):`, error)
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
message: 'RSS polling failed',
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
requestId,
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
} finally {
|
||||
await releaseLock(LOCK_KEY).catch(() => {})
|
||||
}
|
||||
}
|
||||
@@ -544,6 +544,43 @@ export async function POST(request: NextRequest) {
|
||||
}
|
||||
// --- End Outlook specific logic ---
|
||||
|
||||
// --- RSS webhook setup ---
|
||||
if (savedWebhook && provider === 'rss') {
|
||||
logger.info(`[${requestId}] RSS provider detected. Setting up RSS webhook configuration.`)
|
||||
try {
|
||||
const { configureRssPolling } = await import('@/lib/webhooks/utils.server')
|
||||
const success = await configureRssPolling(savedWebhook, requestId)
|
||||
|
||||
if (!success) {
|
||||
logger.error(`[${requestId}] Failed to configure RSS polling, rolling back webhook`)
|
||||
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Failed to configure RSS polling',
|
||||
details: 'Please try again',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Successfully configured RSS polling`)
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`[${requestId}] Error setting up RSS webhook configuration, rolling back webhook`,
|
||||
err
|
||||
)
|
||||
await db.delete(webhook).where(eq(webhook.id, savedWebhook.id))
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Failed to configure RSS webhook',
|
||||
details: err instanceof Error ? err.message : 'Unknown error',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
// --- End RSS specific logic ---
|
||||
|
||||
const status = targetWebhookId ? 200 : 201
|
||||
return NextResponse.json({ webhook: savedWebhook }, { status })
|
||||
} catch (error: any) {
|
||||
|
||||
36
apps/sim/blocks/blocks/rss.ts
Normal file
36
apps/sim/blocks/blocks/rss.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { RssIcon } from '@/components/icons'
|
||||
import type { BlockConfig } from '@/blocks/types'
|
||||
import { getTrigger } from '@/triggers'
|
||||
|
||||
export const RssBlock: BlockConfig = {
|
||||
type: 'rss',
|
||||
name: 'RSS Feed',
|
||||
description: 'Monitor RSS feeds and trigger workflows when new items are published',
|
||||
longDescription:
|
||||
'Subscribe to any RSS or Atom feed and automatically trigger your workflow when new content is published. Perfect for monitoring blogs, news sites, podcasts, and any content that publishes an RSS feed.',
|
||||
category: 'triggers',
|
||||
bgColor: '#F97316',
|
||||
icon: RssIcon,
|
||||
triggerAllowed: true,
|
||||
|
||||
subBlocks: [...getTrigger('rss_poller').subBlocks],
|
||||
|
||||
tools: {
|
||||
access: [], // Trigger-only for now
|
||||
},
|
||||
|
||||
inputs: {},
|
||||
|
||||
outputs: {
|
||||
title: { type: 'string', description: 'Item title' },
|
||||
link: { type: 'string', description: 'Item link' },
|
||||
pubDate: { type: 'string', description: 'Publication date' },
|
||||
item: { type: 'json', description: 'Raw item object with all fields' },
|
||||
feed: { type: 'json', description: 'Raw feed object with all fields' },
|
||||
},
|
||||
|
||||
triggers: {
|
||||
enabled: true,
|
||||
available: ['rss_poller'],
|
||||
},
|
||||
}
|
||||
@@ -89,6 +89,7 @@ import { RedditBlock } from '@/blocks/blocks/reddit'
|
||||
import { ResendBlock } from '@/blocks/blocks/resend'
|
||||
import { ResponseBlock } from '@/blocks/blocks/response'
|
||||
import { RouterBlock } from '@/blocks/blocks/router'
|
||||
import { RssBlock } from '@/blocks/blocks/rss'
|
||||
import { S3Block } from '@/blocks/blocks/s3'
|
||||
import { SalesforceBlock } from '@/blocks/blocks/salesforce'
|
||||
import { ScheduleBlock } from '@/blocks/blocks/schedule'
|
||||
@@ -229,6 +230,7 @@ export const registry: Record<string, BlockConfig> = {
|
||||
reddit: RedditBlock,
|
||||
resend: ResendBlock,
|
||||
response: ResponseBlock,
|
||||
rss: RssBlock,
|
||||
router: RouterBlock,
|
||||
s3: S3Block,
|
||||
salesforce: SalesforceBlock,
|
||||
|
||||
@@ -4170,3 +4170,32 @@ export function DuckDuckGoIcon(props: SVGProps<SVGSVGElement>) {
|
||||
</svg>
|
||||
)
|
||||
}
|
||||
|
||||
export function RssIcon(props: SVGProps<SVGSVGElement>) {
|
||||
return (
|
||||
<svg
|
||||
{...props}
|
||||
width='24'
|
||||
height='24'
|
||||
viewBox='0 0 24 24'
|
||||
fill='none'
|
||||
xmlns='http://www.w3.org/2000/svg'
|
||||
>
|
||||
<path
|
||||
d='M4 11C6.38695 11 8.67613 11.9482 10.364 13.636C12.0518 15.3239 13 17.6131 13 20'
|
||||
stroke='currentColor'
|
||||
strokeWidth='2'
|
||||
strokeLinecap='round'
|
||||
strokeLinejoin='round'
|
||||
/>
|
||||
<path
|
||||
d='M4 4C8.24346 4 12.3131 5.68571 15.3137 8.68629C18.3143 11.6869 20 15.7565 20 20'
|
||||
stroke='currentColor'
|
||||
strokeWidth='2'
|
||||
strokeLinecap='round'
|
||||
strokeLinejoin='round'
|
||||
/>
|
||||
<circle cx='5' cy='19' r='1' fill='currentColor' />
|
||||
</svg>
|
||||
)
|
||||
}
|
||||
|
||||
414
apps/sim/lib/webhooks/rss-polling-service.ts
Normal file
414
apps/sim/lib/webhooks/rss-polling-service.ts
Normal file
@@ -0,0 +1,414 @@
|
||||
import { db } from '@sim/db'
|
||||
import { webhook, workflow } from '@sim/db/schema'
|
||||
import { and, eq, sql } from 'drizzle-orm'
|
||||
import { nanoid } from 'nanoid'
|
||||
import Parser from 'rss-parser'
|
||||
import { pollingIdempotency } from '@/lib/core/idempotency/service'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
|
||||
const logger = createLogger('RssPollingService')
|
||||
|
||||
const MAX_CONSECUTIVE_FAILURES = 10
|
||||
const MAX_GUIDS_TO_TRACK = 100 // Track recent guids to prevent duplicates
|
||||
|
||||
interface RssWebhookConfig {
|
||||
feedUrl: string
|
||||
lastCheckedTimestamp?: string
|
||||
lastSeenGuids?: string[]
|
||||
etag?: string
|
||||
lastModified?: string
|
||||
}
|
||||
|
||||
interface RssItem {
|
||||
title?: string
|
||||
link?: string
|
||||
pubDate?: string
|
||||
guid?: string
|
||||
description?: string
|
||||
content?: string
|
||||
contentSnippet?: string
|
||||
author?: string
|
||||
creator?: string
|
||||
categories?: string[]
|
||||
enclosure?: {
|
||||
url: string
|
||||
type?: string
|
||||
length?: string | number
|
||||
}
|
||||
isoDate?: string
|
||||
[key: string]: any
|
||||
}
|
||||
|
||||
interface RssFeed {
|
||||
title?: string
|
||||
link?: string
|
||||
description?: string
|
||||
items: RssItem[]
|
||||
}
|
||||
|
||||
export interface RssWebhookPayload {
|
||||
item: RssItem
|
||||
feed: {
|
||||
title?: string
|
||||
link?: string
|
||||
description?: string
|
||||
}
|
||||
timestamp: string
|
||||
}
|
||||
|
||||
const parser = new Parser({
|
||||
timeout: 30000,
|
||||
headers: {
|
||||
'User-Agent': 'SimStudio/1.0 RSS Poller',
|
||||
},
|
||||
})
|
||||
|
||||
async function markWebhookFailed(webhookId: string) {
|
||||
try {
|
||||
const result = await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
failedCount: sql`COALESCE(${webhook.failedCount}, 0) + 1`,
|
||||
lastFailedAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
.returning({ failedCount: webhook.failedCount })
|
||||
|
||||
const newFailedCount = result[0]?.failedCount || 0
|
||||
const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES
|
||||
|
||||
if (shouldDisable) {
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
isActive: false,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
|
||||
logger.warn(
|
||||
`Webhook ${webhookId} auto-disabled after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(`Failed to mark webhook ${webhookId} as failed:`, err)
|
||||
}
|
||||
}
|
||||
|
||||
async function markWebhookSuccess(webhookId: string) {
|
||||
try {
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
failedCount: 0,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
} catch (err) {
|
||||
logger.error(`Failed to mark webhook ${webhookId} as successful:`, err)
|
||||
}
|
||||
}
|
||||
|
||||
export async function pollRssWebhooks() {
|
||||
logger.info('Starting RSS webhook polling')
|
||||
|
||||
try {
|
||||
const activeWebhooksResult = await db
|
||||
.select({ webhook })
|
||||
.from(webhook)
|
||||
.innerJoin(workflow, eq(webhook.workflowId, workflow.id))
|
||||
.where(
|
||||
and(eq(webhook.provider, 'rss'), eq(webhook.isActive, true), eq(workflow.isDeployed, true))
|
||||
)
|
||||
|
||||
const activeWebhooks = activeWebhooksResult.map((r) => r.webhook)
|
||||
|
||||
if (!activeWebhooks.length) {
|
||||
logger.info('No active RSS webhooks found')
|
||||
return { total: 0, successful: 0, failed: 0, details: [] }
|
||||
}
|
||||
|
||||
logger.info(`Found ${activeWebhooks.length} active RSS webhooks`)
|
||||
|
||||
const CONCURRENCY = 10
|
||||
const running: Promise<void>[] = []
|
||||
let successCount = 0
|
||||
let failureCount = 0
|
||||
|
||||
const enqueue = async (webhookData: (typeof activeWebhooks)[number]) => {
|
||||
const webhookId = webhookData.id
|
||||
const requestId = nanoid()
|
||||
|
||||
try {
|
||||
const config = webhookData.providerConfig as unknown as RssWebhookConfig
|
||||
|
||||
if (!config?.feedUrl) {
|
||||
logger.error(`[${requestId}] Missing feedUrl for webhook ${webhookId}`)
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
return
|
||||
}
|
||||
|
||||
const now = new Date()
|
||||
|
||||
const { feed, items: newItems } = await fetchNewRssItems(config, requestId)
|
||||
|
||||
if (!newItems.length) {
|
||||
await updateWebhookConfig(webhookId, config, now.toISOString(), [])
|
||||
await markWebhookSuccess(webhookId)
|
||||
logger.info(`[${requestId}] No new items found for webhook ${webhookId}`)
|
||||
successCount++
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Found ${newItems.length} new items for webhook ${webhookId}`)
|
||||
|
||||
const { processedCount, failedCount: itemFailedCount } = await processRssItems(
|
||||
newItems,
|
||||
feed,
|
||||
webhookData,
|
||||
requestId
|
||||
)
|
||||
|
||||
// Collect guids from processed items
|
||||
const newGuids = newItems
|
||||
.map((item) => item.guid || item.link || '')
|
||||
.filter((guid) => guid.length > 0)
|
||||
|
||||
await updateWebhookConfig(webhookId, config, now.toISOString(), newGuids)
|
||||
|
||||
if (itemFailedCount > 0 && processedCount === 0) {
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
logger.warn(
|
||||
`[${requestId}] All ${itemFailedCount} items failed to process for webhook ${webhookId}`
|
||||
)
|
||||
} else {
|
||||
await markWebhookSuccess(webhookId)
|
||||
successCount++
|
||||
logger.info(
|
||||
`[${requestId}] Successfully processed ${processedCount} items for webhook ${webhookId}${itemFailedCount > 0 ? ` (${itemFailedCount} failed)` : ''}`
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error processing RSS webhook ${webhookId}:`, error)
|
||||
await markWebhookFailed(webhookId)
|
||||
failureCount++
|
||||
}
|
||||
}
|
||||
|
||||
for (const webhookData of activeWebhooks) {
|
||||
const promise = enqueue(webhookData)
|
||||
.then(() => {})
|
||||
.catch((err) => {
|
||||
logger.error('Unexpected error in webhook processing:', err)
|
||||
failureCount++
|
||||
})
|
||||
|
||||
running.push(promise)
|
||||
|
||||
if (running.length >= CONCURRENCY) {
|
||||
const completedIdx = await Promise.race(running.map((p, i) => p.then(() => i)))
|
||||
running.splice(completedIdx, 1)
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.allSettled(running)
|
||||
|
||||
const summary = {
|
||||
total: activeWebhooks.length,
|
||||
successful: successCount,
|
||||
failed: failureCount,
|
||||
details: [],
|
||||
}
|
||||
|
||||
logger.info('RSS polling completed', {
|
||||
total: summary.total,
|
||||
successful: summary.successful,
|
||||
failed: summary.failed,
|
||||
})
|
||||
|
||||
return summary
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error('Error in RSS polling service:', errorMessage)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchNewRssItems(
|
||||
config: RssWebhookConfig,
|
||||
requestId: string
|
||||
): Promise<{ feed: RssFeed; items: RssItem[] }> {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Fetching RSS feed: ${config.feedUrl}`)
|
||||
|
||||
// Parse the RSS feed
|
||||
const feed = await parser.parseURL(config.feedUrl)
|
||||
|
||||
if (!feed.items || !feed.items.length) {
|
||||
logger.debug(`[${requestId}] No items in feed`)
|
||||
return { feed: feed as RssFeed, items: [] }
|
||||
}
|
||||
|
||||
// Filter new items based on timestamp and guids
|
||||
const lastCheckedTime = config.lastCheckedTimestamp
|
||||
? new Date(config.lastCheckedTimestamp)
|
||||
: null
|
||||
const lastSeenGuids = new Set(config.lastSeenGuids || [])
|
||||
|
||||
const newItems = feed.items.filter((item) => {
|
||||
const itemGuid = item.guid || item.link || ''
|
||||
|
||||
// Check if we've already seen this item by guid
|
||||
if (itemGuid && lastSeenGuids.has(itemGuid)) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if the item is newer than our last check
|
||||
if (lastCheckedTime && item.isoDate) {
|
||||
const itemDate = new Date(item.isoDate)
|
||||
if (itemDate <= lastCheckedTime) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
// Sort by date, newest first
|
||||
newItems.sort((a, b) => {
|
||||
const dateA = a.isoDate ? new Date(a.isoDate).getTime() : 0
|
||||
const dateB = b.isoDate ? new Date(b.isoDate).getTime() : 0
|
||||
return dateB - dateA
|
||||
})
|
||||
|
||||
// Limit to 25 items per poll to prevent overwhelming the system
|
||||
const limitedItems = newItems.slice(0, 25)
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Found ${newItems.length} new items (processing ${limitedItems.length})`
|
||||
)
|
||||
|
||||
return { feed: feed as RssFeed, items: limitedItems as RssItem[] }
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error fetching RSS feed:`, errorMessage)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async function processRssItems(
|
||||
items: RssItem[],
|
||||
feed: RssFeed,
|
||||
webhookData: any,
|
||||
requestId: string
|
||||
): Promise<{ processedCount: number; failedCount: number }> {
|
||||
let processedCount = 0
|
||||
let failedCount = 0
|
||||
|
||||
for (const item of items) {
|
||||
try {
|
||||
const itemGuid = item.guid || item.link || `${item.title}-${item.pubDate}`
|
||||
|
||||
await pollingIdempotency.executeWithIdempotency(
|
||||
'rss',
|
||||
`${webhookData.id}:${itemGuid}`,
|
||||
async () => {
|
||||
const payload: RssWebhookPayload = {
|
||||
item: {
|
||||
title: item.title,
|
||||
link: item.link,
|
||||
pubDate: item.pubDate,
|
||||
guid: item.guid,
|
||||
description: item.description,
|
||||
content: item.content,
|
||||
contentSnippet: item.contentSnippet,
|
||||
author: item.author || item.creator,
|
||||
categories: item.categories,
|
||||
enclosure: item.enclosure,
|
||||
isoDate: item.isoDate,
|
||||
},
|
||||
feed: {
|
||||
title: feed.title,
|
||||
link: feed.link,
|
||||
description: feed.description,
|
||||
},
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
|
||||
const webhookUrl = `${getBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
|
||||
|
||||
const response = await fetch(webhookUrl, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Webhook-Secret': webhookData.secret || '',
|
||||
'User-Agent': 'SimStudio/1.0',
|
||||
},
|
||||
body: JSON.stringify(payload),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
logger.error(
|
||||
`[${requestId}] Failed to trigger webhook for item ${itemGuid}:`,
|
||||
response.status,
|
||||
errorText
|
||||
)
|
||||
throw new Error(`Webhook request failed: ${response.status} - ${errorText}`)
|
||||
}
|
||||
|
||||
return {
|
||||
itemGuid,
|
||||
webhookStatus: response.status,
|
||||
processed: true,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Successfully processed item ${item.title || itemGuid} for webhook ${webhookData.id}`
|
||||
)
|
||||
processedCount++
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Error processing item:`, errorMessage)
|
||||
failedCount++
|
||||
}
|
||||
}
|
||||
|
||||
return { processedCount, failedCount }
|
||||
}
|
||||
|
||||
async function updateWebhookConfig(
|
||||
webhookId: string,
|
||||
_config: RssWebhookConfig,
|
||||
timestamp: string,
|
||||
newGuids: string[]
|
||||
) {
|
||||
try {
|
||||
const result = await db.select().from(webhook).where(eq(webhook.id, webhookId))
|
||||
const existingConfig = (result[0]?.providerConfig as Record<string, any>) || {}
|
||||
|
||||
// Merge new guids with existing ones, keeping only the most recent
|
||||
const existingGuids = existingConfig.lastSeenGuids || []
|
||||
const allGuids = [...newGuids, ...existingGuids].slice(0, MAX_GUIDS_TO_TRACK)
|
||||
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
providerConfig: {
|
||||
...existingConfig,
|
||||
lastCheckedTimestamp: timestamp,
|
||||
lastSeenGuids: allGuids,
|
||||
} as any,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(webhook.id, webhookId))
|
||||
} catch (err) {
|
||||
logger.error(`Failed to update webhook ${webhookId} config:`, err)
|
||||
}
|
||||
}
|
||||
@@ -795,6 +795,33 @@ export async function formatWebhookInput(
|
||||
return body
|
||||
}
|
||||
|
||||
if (foundWebhook.provider === 'rss') {
|
||||
if (body && typeof body === 'object' && 'item' in body) {
|
||||
const item = body.item as Record<string, any>
|
||||
const feed = body.feed as Record<string, any>
|
||||
|
||||
return {
|
||||
title: item?.title,
|
||||
link: item?.link,
|
||||
pubDate: item?.pubDate,
|
||||
item,
|
||||
feed,
|
||||
webhook: {
|
||||
data: {
|
||||
provider: 'rss',
|
||||
path: foundWebhook.path,
|
||||
providerConfig: foundWebhook.providerConfig,
|
||||
payload: body,
|
||||
headers: Object.fromEntries(request.headers.entries()),
|
||||
method: request.method,
|
||||
},
|
||||
},
|
||||
workflowId: foundWorkflow.id,
|
||||
}
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
if (foundWebhook.provider === 'hubspot') {
|
||||
const events = Array.isArray(body) ? body : [body]
|
||||
const event = events[0]
|
||||
@@ -2344,6 +2371,41 @@ export async function configureOutlookPolling(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure RSS polling for a webhook
|
||||
*/
|
||||
export async function configureRssPolling(webhookData: any, requestId: string): Promise<boolean> {
|
||||
const logger = createLogger('RssWebhookSetup')
|
||||
logger.info(`[${requestId}] Setting up RSS polling for webhook ${webhookData.id}`)
|
||||
|
||||
try {
|
||||
const providerConfig = (webhookData.providerConfig as Record<string, any>) || {}
|
||||
const now = new Date()
|
||||
|
||||
await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
providerConfig: {
|
||||
...providerConfig,
|
||||
lastCheckedTimestamp: now.toISOString(),
|
||||
lastSeenGuids: [],
|
||||
setupCompleted: true,
|
||||
},
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(webhook.id, webhookData.id))
|
||||
|
||||
logger.info(`[${requestId}] Successfully configured RSS polling for webhook ${webhookData.id}`)
|
||||
return true
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Failed to configure RSS polling`, {
|
||||
webhookId: webhookData.id,
|
||||
error: error.message,
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
export function convertSquareBracketsToTwiML(twiml: string | undefined): string | undefined {
|
||||
if (!twiml) {
|
||||
return twiml
|
||||
|
||||
@@ -72,6 +72,7 @@ import {
|
||||
microsoftTeamsWebhookTrigger,
|
||||
} from '@/triggers/microsoftteams'
|
||||
import { outlookPollingTrigger } from '@/triggers/outlook'
|
||||
import { rssPollingTrigger } from '@/triggers/rss'
|
||||
import { slackWebhookTrigger } from '@/triggers/slack'
|
||||
import { stripeWebhookTrigger } from '@/triggers/stripe'
|
||||
import { telegramWebhookTrigger } from '@/triggers/telegram'
|
||||
@@ -131,6 +132,7 @@ export const TRIGGER_REGISTRY: TriggerRegistry = {
|
||||
microsoftteams_webhook: microsoftTeamsWebhookTrigger,
|
||||
microsoftteams_chat_subscription: microsoftTeamsChatSubscriptionTrigger,
|
||||
outlook_poller: outlookPollingTrigger,
|
||||
rss_poller: rssPollingTrigger,
|
||||
stripe_webhook: stripeWebhookTrigger,
|
||||
telegram_webhook: telegramWebhookTrigger,
|
||||
typeform_webhook: typeformWebhookTrigger,
|
||||
|
||||
1
apps/sim/triggers/rss/index.ts
Normal file
1
apps/sim/triggers/rss/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { rssPollingTrigger } from './poller'
|
||||
115
apps/sim/triggers/rss/poller.ts
Normal file
115
apps/sim/triggers/rss/poller.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { RssIcon } from '@/components/icons'
|
||||
import type { TriggerConfig } from '@/triggers/types'
|
||||
|
||||
export const rssPollingTrigger: TriggerConfig = {
|
||||
id: 'rss_poller',
|
||||
name: 'RSS Feed Trigger',
|
||||
provider: 'rss',
|
||||
description: 'Triggers when new items are published to an RSS feed',
|
||||
version: '1.0.0',
|
||||
icon: RssIcon,
|
||||
|
||||
subBlocks: [
|
||||
{
|
||||
id: 'feedUrl',
|
||||
title: 'Feed URL',
|
||||
type: 'short-input',
|
||||
placeholder: 'https://example.com/feed.xml',
|
||||
description: 'The URL of the RSS or Atom feed to monitor',
|
||||
required: true,
|
||||
mode: 'trigger',
|
||||
},
|
||||
{
|
||||
id: 'triggerInstructions',
|
||||
title: 'Setup Instructions',
|
||||
hideFromPreview: true,
|
||||
type: 'text',
|
||||
defaultValue: [
|
||||
'Enter the URL of any RSS or Atom feed you want to monitor',
|
||||
'The feed will be checked every minute for new items',
|
||||
'When a new item is published, your workflow will be triggered with the item data',
|
||||
]
|
||||
.map(
|
||||
(instruction, index) =>
|
||||
`<div class="mb-3"><strong>${index + 1}.</strong> ${instruction}</div>`
|
||||
)
|
||||
.join(''),
|
||||
mode: 'trigger',
|
||||
},
|
||||
{
|
||||
id: 'triggerSave',
|
||||
title: '',
|
||||
type: 'trigger-save',
|
||||
hideFromPreview: true,
|
||||
mode: 'trigger',
|
||||
triggerId: 'rss_poller',
|
||||
},
|
||||
],
|
||||
|
||||
outputs: {
|
||||
item: {
|
||||
title: {
|
||||
type: 'string',
|
||||
description: 'Item title',
|
||||
},
|
||||
link: {
|
||||
type: 'string',
|
||||
description: 'Item link/URL',
|
||||
},
|
||||
pubDate: {
|
||||
type: 'string',
|
||||
description: 'Publication date',
|
||||
},
|
||||
guid: {
|
||||
type: 'string',
|
||||
description: 'Unique identifier',
|
||||
},
|
||||
summary: {
|
||||
type: 'string',
|
||||
description: 'Item description/summary',
|
||||
},
|
||||
content: {
|
||||
type: 'string',
|
||||
description: 'Full content (content:encoded)',
|
||||
},
|
||||
contentSnippet: {
|
||||
type: 'string',
|
||||
description: 'Content snippet without HTML',
|
||||
},
|
||||
author: {
|
||||
type: 'string',
|
||||
description: 'Author name',
|
||||
},
|
||||
categories: {
|
||||
type: 'json',
|
||||
description: 'Categories/tags array',
|
||||
},
|
||||
enclosure: {
|
||||
type: 'json',
|
||||
description: 'Media attachment info (url, type, length)',
|
||||
},
|
||||
isoDate: {
|
||||
type: 'string',
|
||||
description: 'Publication date in ISO format',
|
||||
},
|
||||
},
|
||||
feed: {
|
||||
title: {
|
||||
type: 'string',
|
||||
description: 'Feed title',
|
||||
},
|
||||
link: {
|
||||
type: 'string',
|
||||
description: 'Feed website link',
|
||||
},
|
||||
feedDescription: {
|
||||
type: 'string',
|
||||
description: 'Feed description',
|
||||
},
|
||||
},
|
||||
timestamp: {
|
||||
type: 'string',
|
||||
description: 'Event timestamp',
|
||||
},
|
||||
},
|
||||
}
|
||||
12
bun.lock
12
bun.lock
@@ -1,5 +1,6 @@
|
||||
{
|
||||
"lockfileVersion": 1,
|
||||
"configVersion": 0,
|
||||
"workspaces": {
|
||||
"": {
|
||||
"name": "simstudio",
|
||||
@@ -20,6 +21,7 @@
|
||||
"onedollarstats": "0.0.10",
|
||||
"postgres": "^3.4.5",
|
||||
"remark-gfm": "4.0.1",
|
||||
"rss-parser": "3.13.0",
|
||||
"socket.io-client": "4.8.1",
|
||||
"twilio": "5.9.0",
|
||||
},
|
||||
@@ -2893,6 +2895,8 @@
|
||||
|
||||
"rrweb-cssom": ["rrweb-cssom@0.8.0", "", {}, "sha512-guoltQEx+9aMf2gDZ0s62EcV8lsXR+0w8915TC3ITdn2YueuNjdAYh/levpU9nFaoChh9RUS5ZdQMrKfVEN9tw=="],
|
||||
|
||||
"rss-parser": ["rss-parser@3.13.0", "", { "dependencies": { "entities": "^2.0.3", "xml2js": "^0.5.0" } }, "sha512-7jWUBV5yGN3rqMMj7CZufl/291QAhvrrGpDNE4k/02ZchL0npisiYYqULF71jCEKoIiHvK/Q2e6IkDwPziT7+w=="],
|
||||
|
||||
"run-async": ["run-async@2.4.1", "", {}, "sha512-tvVnVv01b8c1RrA6Ep7JkStj85Guv/YrMcwqYQnwjsAS2cTmmPGBBjAjpCW7RrSodNSoE2/qg9O4bceNvUuDgQ=="],
|
||||
|
||||
"run-exclusive": ["run-exclusive@2.2.19", "", { "dependencies": { "minimal-polyfills": "^2.2.3" } }, "sha512-K3mdoAi7tjJ/qT7Flj90L7QyPozwUaAG+CVhkdDje4HLKXUYC3N/Jzkau3flHVDLQVhiHBtcimVodMjN9egYbA=="],
|
||||
@@ -2911,6 +2915,8 @@
|
||||
|
||||
"satori": ["satori@0.12.2", "", { "dependencies": { "@shuding/opentype.js": "1.4.0-beta.0", "css-background-parser": "^0.1.0", "css-box-shadow": "1.0.0-3", "css-gradient-parser": "^0.0.16", "css-to-react-native": "^3.0.0", "emoji-regex": "^10.2.1", "escape-html": "^1.0.3", "linebreak": "^1.1.0", "parse-css-color": "^0.2.1", "postcss-value-parser": "^4.2.0", "yoga-wasm-web": "^0.3.3" } }, "sha512-3C/laIeE6UUe9A+iQ0A48ywPVCCMKCNSTU5Os101Vhgsjd3AAxGNjyq0uAA8kulMPK5n0csn8JlxPN9riXEjLA=="],
|
||||
|
||||
"sax": ["sax@1.4.3", "", {}, "sha512-yqYn1JhPczigF94DMS+shiDMjDowYO6y9+wB/4WgO0Y19jWYk0lQ4tuG5KI7kj4FTp1wxPj5IFfcrz/s1c3jjQ=="],
|
||||
|
||||
"saxes": ["saxes@6.0.0", "", { "dependencies": { "xmlchars": "^2.2.0" } }, "sha512-xAg7SOnEhrm5zI3puOOKyy1OMcMlIJZYNJY7xLBwSze0UjhPLnWfj2GF2EpT0jmzaJKIWKHLsaSSajf35bcYnA=="],
|
||||
|
||||
"scheduler": ["scheduler@0.27.0", "", {}, "sha512-eNv+WrVbKu1f3vbYJT/xtiF5syA5HPIMtf9IgY/nKg0sWqzAUEvqY/xm7OcZc/qafLx/iO9FgOmeSAp4v5ti/Q=="],
|
||||
@@ -3291,6 +3297,8 @@
|
||||
|
||||
"xml-name-validator": ["xml-name-validator@5.0.0", "", {}, "sha512-EvGK8EJ3DhaHfbRlETOWAS5pO9MZITeauHKJyb8wyajUfQUenkIg2MvLDTZ4T/TgIcm3HU0TFBgWWboAZ30UHg=="],
|
||||
|
||||
"xml2js": ["xml2js@0.5.0", "", { "dependencies": { "sax": ">=0.6.0", "xmlbuilder": "~11.0.0" } }, "sha512-drPFnkQJik/O+uPKpqSgr22mpuFHqKdbS835iAQrUC73L2F5WkboIRd63ai/2Yg6I1jzifPFKH2NTK+cfglkIA=="],
|
||||
|
||||
"xmlbuilder": ["xmlbuilder@13.0.2", "", {}, "sha512-Eux0i2QdDYKbdbA6AM6xE4m6ZTZr4G4xF9kahI2ukSEMCzwce2eX9WlTI5J3s+NU7hpasFsr8hWIONae7LluAQ=="],
|
||||
|
||||
"xmlchars": ["xmlchars@2.2.0", "", {}, "sha512-JZnDKK8B0RCDw84FNdDAIpZK+JuJw+s7Lz8nksI7SIuU3UXJJslUthsi+uWBUYOwPFwW7W7PRLRfUKpxjtjFCw=="],
|
||||
@@ -3777,6 +3785,8 @@
|
||||
|
||||
"rimraf/glob": ["glob@10.5.0", "", { "dependencies": { "foreground-child": "^3.1.0", "jackspeak": "^3.1.2", "minimatch": "^9.0.4", "minipass": "^7.1.2", "package-json-from-dist": "^1.0.0", "path-scurry": "^1.11.1" }, "bin": { "glob": "dist/esm/bin.mjs" } }, "sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg=="],
|
||||
|
||||
"rss-parser/entities": ["entities@2.2.0", "", {}, "sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A=="],
|
||||
|
||||
"samlify/uuid": ["uuid@8.3.2", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg=="],
|
||||
|
||||
"sim/@types/node": ["@types/node@24.2.1", "", { "dependencies": { "undici-types": "~7.10.0" } }, "sha512-DRh5K+ka5eJic8CjH7td8QpYEV6Zo10gfRkjHCO3weqZHWDtAaSTFtl4+VMqOJ4N5jcuhZ9/l+yy8rVgw7BQeQ=="],
|
||||
@@ -3835,6 +3845,8 @@
|
||||
|
||||
"xml-crypto/xpath": ["xpath@0.0.33", "", {}, "sha512-NNXnzrkDrAzalLhIUc01jO2mOzXGXh1JwPgkihcLLzw98c0WgYDmmjSh1Kl3wzaxSVWMuA+fe0WTWOBDWCBmNA=="],
|
||||
|
||||
"xml2js/xmlbuilder": ["xmlbuilder@11.0.1", "", {}, "sha512-fDlsI/kFEx7gLvbecc0/ohLG50fugQp8ryHzMTuW9vSa1GJ0XYWKnhsUx7oie3G98+r56aTQIUB4kht42R3JvA=="],
|
||||
|
||||
"@anthropic-ai/sdk/@types/node/undici-types": ["undici-types@5.26.5", "", {}, "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA=="],
|
||||
|
||||
"@anthropic-ai/sdk/node-fetch/whatwg-url": ["whatwg-url@5.0.0", "", { "dependencies": { "tr46": "~0.0.3", "webidl-conversions": "^3.0.0" } }, "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw=="],
|
||||
|
||||
@@ -652,7 +652,16 @@ cronjobs:
|
||||
concurrencyPolicy: Forbid
|
||||
successfulJobsHistoryLimit: 3
|
||||
failedJobsHistoryLimit: 1
|
||||
|
||||
|
||||
rssWebhookPoll:
|
||||
enabled: true
|
||||
name: rss-webhook-poll
|
||||
schedule: "*/1 * * * *"
|
||||
path: "/api/webhooks/poll/rss"
|
||||
concurrencyPolicy: Forbid
|
||||
successfulJobsHistoryLimit: 3
|
||||
failedJobsHistoryLimit: 1
|
||||
|
||||
renewSubscriptions:
|
||||
enabled: true
|
||||
name: renew-subscriptions
|
||||
|
||||
@@ -50,6 +50,7 @@
|
||||
"onedollarstats": "0.0.10",
|
||||
"postgres": "^3.4.5",
|
||||
"remark-gfm": "4.0.1",
|
||||
"rss-parser": "3.13.0",
|
||||
"socket.io-client": "4.8.1",
|
||||
"twilio": "5.9.0"
|
||||
},
|
||||
|
||||
@@ -82,7 +82,11 @@ async function generateIconMapping(): Promise<Record<string, string>> {
|
||||
}
|
||||
|
||||
// Skip blocks that don't have documentation (same logic as generateBlockDoc)
|
||||
if (blockConfig.type.includes('_trigger') || blockConfig.type.includes('_webhook')) {
|
||||
if (
|
||||
blockConfig.type.includes('_trigger') ||
|
||||
blockConfig.type.includes('_webhook') ||
|
||||
blockConfig.type.includes('rss')
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -95,7 +99,8 @@ async function generateIconMapping(): Promise<Record<string, string>> {
|
||||
blockConfig.type === 'webhook' ||
|
||||
blockConfig.type === 'schedule' ||
|
||||
blockConfig.type === 'mcp' ||
|
||||
blockConfig.type === 'generic_webhook'
|
||||
blockConfig.type === 'generic_webhook' ||
|
||||
blockConfig.type === 'rss'
|
||||
) {
|
||||
continue
|
||||
}
|
||||
@@ -910,7 +915,11 @@ async function generateBlockDoc(blockPath: string) {
|
||||
return
|
||||
}
|
||||
|
||||
if (blockConfig.type.includes('_trigger') || blockConfig.type.includes('_webhook')) {
|
||||
if (
|
||||
blockConfig.type.includes('_trigger') ||
|
||||
blockConfig.type.includes('_webhook') ||
|
||||
blockConfig.type.includes('rss')
|
||||
) {
|
||||
console.log(`Skipping ${blockConfig.type} - contains '_trigger'`)
|
||||
return
|
||||
}
|
||||
@@ -924,7 +933,8 @@ async function generateBlockDoc(blockPath: string) {
|
||||
blockConfig.type === 'webhook' ||
|
||||
blockConfig.type === 'schedule' ||
|
||||
blockConfig.type === 'mcp' ||
|
||||
blockConfig.type === 'generic_webhook'
|
||||
blockConfig.type === 'generic_webhook' ||
|
||||
blockConfig.type === 'rss'
|
||||
) {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user