Compare commits

...

27 Commits

Author SHA1 Message Date
Waleed
1c2c2c65d4 v0.5.110: webhook execution speedups, SSRF patches 2026-03-11 15:00:24 -07:00
Waleed
19ef526886 fix(webhooks): eliminate redundant DB queries from webhook execution path (#3523)
* fix(webhooks): eliminate redundant DB queries from webhook execution path

* chore(webhooks): remove implementation-detail comments

* fix(webhooks): restore auth-first ordering and add credential resolution warning

- Revert parallel auth+preprocessing to sequential auth→preprocessing
  to prevent rate-limit exhaustion via unauthenticated requests
- Add warning log when credential account resolution fails in background job

* fix(webhooks): restore auth-before-reachability ordering and remove dead credentialAccountUserId field

- Move reachability test back after auth to prevent path enumeration
- Remove dead credentialAccountUserId from WebhookExecutionPayload
- Simplify credential resolution condition in background job
2026-03-11 14:51:04 -07:00
Waleed
ff2a1527ab fix(security): add SSRF protection to database tools and webhook delivery (#3500)
* fix(security): add SSRF protection to database tools and webhook delivery

* fix(security): address review comments on SSRF PR

- Remove Promise.race timeout pattern to avoid unhandled rejections
  (http.request timeout is sufficient for webhook delivery)
- Use safeCompare in verifyCronAuth instead of inline HMAC logic
- Strip IPv6 brackets before validateDatabaseHost in Redis route

* fix(security): allow HTTP webhooks and fix misleading MCP error docs

- Add allowHttp option to validateExternalUrl, validateUrlWithDNS,
  and secureFetchWithValidation to support HTTP webhook URLs
- Pass allowHttp: true for webhook delivery and test endpoints
- Fix misleading JSDoc on createMcpErrorResponse (doesn't log errors)
- Mark unused error param with underscore prefix

* fix(security): forward allowHttp option through redirect validation

Pass allowHttp to validateUrlWithDNS in the redirect handler of
secureFetchWithPinnedIP so HTTP-to-HTTP redirects work when allowHttp
is enabled for webhook delivery.

* fix(security): block localhost when allowHttp is enabled

When allowHttp is true (user-supplied webhook URLs), explicitly block
localhost/loopback in both validateExternalUrl and validateUrlWithDNS
to prevent SSRF against internal services.

* fix(security): always strip multi-line content in sanitizeConnectionError

Take the first line of the error message regardless of length to
prevent leaking sensitive data from multi-line error messages.
2026-03-09 20:28:28 -07:00
Waleed
2e1c639a81 fix(parallel): align integration with Parallel AI API docs (#3501)
* fix(parallel): align integration with Parallel AI API docs

* fix(parallel): keep processor subBlock ID for backwards compatibility

* fix(parallel): move error field to top level per ToolResponse interface

* fix(parallel): guard research_input and prevent domain leakage across operations

* fix(parallel): make url/title nullable in types to match transformResponse

* fix(parallel): revert search_queries param type to string for backwards compatibility
2026-03-09 19:47:30 -07:00
Waleed
ecd3536a72 v0.5.109: obsidian and evernote integrations, slack fixes, remove memory instrumentation 2026-03-09 10:40:37 -07:00
Theodore Li
635179d696 Revert "feat(hosted key): Add exa hosted key (#3221)" (#3495)
This reverts commit 158d5236bc.

Co-authored-by: Theodore Li <teddy@zenobiapay.com>
2026-03-09 10:31:54 -07:00
Waleed
f88926a6a8 fix(webhooks): return empty 200 for Slack to close modals cleanly (#3492)
* fix(webhooks): return empty 200 for Slack to close modals cleanly

* fix(webhooks): add clarifying comment on Slack error path trade-off
2026-03-09 10:11:36 -07:00
Waleed
690b47a0bf chore(monitoring): remove SSE connection tracking and Bun.gc debug instrumentation (#3472) 2026-03-08 17:27:05 -07:00
Theodore Li
158d5236bc feat(hosted key): Add exa hosted key (#3221)
* feat(hosted keys): Implement serper hosted key

* Handle required fields correctly for hosted keys

* Add rate limiting (3 tries, exponential backoff)

* Add custom pricing, switch to exa as first hosted key

* Add telemetry

* Consolidate byok type definitions

* Add warning comment if default calculation is used

* Record usage to user stats table

* Fix unit tests, use cost property

* Include more metadata in cost output

* Fix disabled tests

* Fix spacing

* Fix lint

* Move knowledge cost restructuring away from generic block handler

* Migrate knowledge unit tests

* Lint

* Fix broken tests

* Add user based hosted key throttling

* Refactor hosted key handling. Add optimistic handling of throttling for custom throttle rules.

* Remove research as hosted key. Recommend BYOK if throtttling occurs

* Make adding api keys adjustable via env vars

* Remove vestigial fields from research

* Make billing actor id required for throttling

* Switch to round robin for api key distribution

* Add helper method for adding hosted key cost

* Strip leading double underscores to avoid breaking change

* Lint fix

* Remove falsy check in favor for explicit null check

* Add more detailed metrics for different throttling types

* Fix _costDollars field

* Handle hosted agent tool calls

* Fail loudly if cost field isn't found

* Remove any type

* Fix type error

* Fix lint

* Fix usage log double logging data

* Fix test

---------

Co-authored-by: Theodore Li <teddy@zenobiapay.com>
2026-03-07 13:06:57 -05:00
Vikhyath Mondreti
8c0a2e04b1 v0.5.108: workflow input params in agent tools, bun upgrade, dropdown selectors for 14 blocks 2026-03-06 21:02:25 -08:00
Waleed
6586c5ce40 v0.5.107: new reddit, slack tools 2026-03-05 22:48:20 -08:00
Vikhyath Mondreti
3ce947566d v0.5.106: condition block and legacy kbs fixes, GPT 5.4 2026-03-05 17:30:05 -08:00
Waleed
70c36cb7aa v0.5.105: slack remove reaction, nested subflow locks fix, servicenow pagination, memory improvements 2026-03-04 22:38:26 -08:00
Waleed
f1ec5fe824 v0.5.104: memory improvements, nested subflows, careers page redirect, brandfetch, google meet 2026-03-03 23:45:29 -08:00
Waleed
e07e3c34cc v0.5.103: memory util instrumentation, API docs, amplitude, google pagespeed insights, pagerduty 2026-03-01 23:27:02 -08:00
Waleed
0d2e6ff31d v0.5.102: new integrations, new tools, ci speedups, memory leak instrumentation 2026-02-28 12:48:10 -08:00
Waleed
4fd0989264 v0.5.101: circular dependency mitigation, confluence enhancements, google tasks and bigquery integrations, workflow lock 2026-02-26 15:04:53 -08:00
Waleed
67f8a687f6 v0.5.100: multiple credentials, 40% speedup, gong, attio, audit log improvements 2026-02-25 00:28:25 -08:00
Waleed
af592349d3 v0.5.99: local dev improvements, live workflow logs in terminal 2026-02-23 00:24:49 -08:00
Waleed
0d86ea01f0 v0.5.98: change detection improvements, rate limit and code execution fixes, removed retired models, hex integration 2026-02-21 18:07:40 -08:00
Waleed
115f04e989 v0.5.97: oidc discovery for copilot mcp 2026-02-21 02:06:25 -08:00
Waleed
34d92fae89 v0.5.96: sim oauth provider, slack ephemeral message tool and blockkit support 2026-02-20 18:22:20 -08:00
Waleed
67aa4bb332 v0.5.95: gemini 3.1 pro, cloudflare, dataverse, revenuecat, redis, upstash, algolia tools; isolated-vm robustness improvements, tables backend (#3271)
* feat(tools): advanced fields for youtube, vercel; added cloudflare and dataverse tools (#3257)

* refactor(vercel): mark optional fields as advanced mode

Move optional/power-user fields behind the advanced toggle:
- List Deployments: project filter, target, state
- Create Deployment: project ID override, redeploy from, target
- List Projects: search
- Create/Update Project: framework, build/output/install commands
- Env Vars: variable type
- Webhooks: project IDs filter
- Checks: path, details URL
- Team Members: role filter
- All operations: team ID scope

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style(youtube): mark optional params as advanced mode

Hide pagination, sort order, and filter fields behind the advanced
toggle for a cleaner default UX across all YouTube operations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* added advanced fields for vercel and youtube, added cloudflare and dataverse block

* addded desc for dataverse

* add more tools

* ack comment

* more

* ops

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* feat(tables): added tables (#2867)

* updates

* required

* trashy table viewer

* updates

* updates

* filtering ui

* updates

* updates

* updates

* one input mode

* format

* fix lints

* improved errors

* updates

* updates

* chages

* doc strings

* breaking down file

* update comments with ai

* updates

* comments

* changes

* revert

* updates

* dedupe

* updates

* updates

* updates

* refactoring

* renames & refactors

* refactoring

* updates

* undo

* update db

* wand

* updates

* fix comments

* fixes

* simplify comments

* u[dates

* renames

* better comments

* validation

* updates

* updates

* updates

* fix sorting

* fix appearnce

* updating prompt to make it user sort

* rm

* updates

* rename

* comments

* clean comments

* simplicifcaiton

* updates

* updates

* refactor

* reduced type confusion

* undo

* rename

* undo changes

* undo

* simplify

* updates

* updates

* revert

* updates

* db updates

* type fix

* fix

* fix error handling

* updates

* docs

* docs

* updates

* rename

* dedupe

* revert

* uncook

* updates

* fix

* fix

* fix

* fix

* prepare merge

* readd migrations

* add back missed code

* migrate enrichment logic to general abstraction

* address bugbot concerns

* adhere to size limits for tables

* remove conflicting migration

* add back migrations

* fix tables auth

* fix permissive auth

* fix lint

* reran migrations

* migrate to use tanstack query for all server state

* update table-selector

* update names

* added tables to permission groups, updated subblock types

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: waleed <walif6@gmail.com>

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running (#3259)

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running

* fixed ci tests failing

* fix(workflows): disallow duplicate workflow names at the same folder level (#3260)

* feat(tools): added redis, upstash, algolia, and revenuecat (#3261)

* feat(tools): added redis, upstash, algolia, and revenuecat

* ack comment

* feat(models): add gemini-3.1-pro-preview and update gemini-3-pro thinking levels (#3263)

* fix(audit-log): lazily resolve actor name/email when missing (#3262)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params (#3264)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params

Number() coercions in tools.config.tool ran at serialization time before
variable resolution, destroying dynamic references like <block.result.count>
by converting them to NaN/null. Moved all coercions to tools.config.params
which runs at execution time after variables are resolved.

Fixed in 15 blocks: exa, arxiv, sentry, incidentio, wikipedia, ahrefs,
posthog, elasticsearch, dropbox, hunter, lemlist, spotify, youtube, grafana,
parallel. Also added mode: 'advanced' to optional exa fields.

Closes #3258

* fix(blocks): address PR review — move remaining param mutations from tool() to params()

- Moved field mappings from tool() to params() in grafana, posthog,
  lemlist, spotify, dropbox (same dynamic reference bug)
- Fixed parallel.ts excerpts/full_content boolean logic
- Fixed parallel.ts search_queries empty case (must set undefined)
- Fixed elasticsearch.ts timeout not included when already ends with 's'
- Restored dropbox.ts tool() switch for proper default fallback

* fix(blocks): restore field renames to tool() for serialization-time validation

Field renames (e.g. personalApiKey→apiKey) must be in tool() because
validateRequiredFieldsBeforeExecution calls selectToolId()→tool() then
checks renamed field names on params. Only type coercions (Number(),
boolean) stay in params() to avoid destroying dynamic variable references.

* improvement(resolver): resovled empty sentinel to not pass through unexecuted valid refs to text inputs (#3266)

* fix(blocks): add required constraint for serviceDeskId in JSM block (#3268)

* fix(blocks): add required constraint for serviceDeskId in JSM block

* fix(blocks): rename custom field values to request field values in JSM create request

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* fix(tables): hide tables from sidebar and block registry (#3270)

* fix(tables): hide tables from sidebar and block registry

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* lint

* fix(trigger): update node version to align with main app (#3272)

* fix(build): fix corrupted sticky disk cache on blacksmith (#3273)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Lakee Sivaraya <71339072+lakeesiv@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
2026-02-20 13:43:07 -08:00
Waleed
15ace5e63f v0.5.94: vercel integration, folder insertion, migrated tracking redirects to rewrites 2026-02-18 16:53:34 -08:00
Waleed
fdca73679d v0.5.93: NextJS config changes, MCP and Blocks whitelisting, copilot keyboard shortcuts, audit logs 2026-02-18 12:10:05 -08:00
Waleed
da46a387c9 v0.5.92: shortlinks, copilot scrolling stickiness, pagination 2026-02-17 15:13:21 -08:00
Waleed
b7e377ec4b v0.5.91: docs i18n, turborepo upgrade 2026-02-16 00:36:05 -08:00
54 changed files with 703 additions and 514 deletions

View File

@@ -44,20 +44,24 @@ Search the web using Parallel AI. Provides comprehensive search results with int
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `objective` | string | Yes | The search objective or question to answer |
| `search_queries` | string | No | Optional comma-separated list of search queries to execute |
| `processor` | string | No | Processing method: base or pro \(default: base\) |
| `max_results` | number | No | Maximum number of results to return \(default: 5\) |
| `max_chars_per_result` | number | No | Maximum characters per result \(default: 1500\) |
| `search_queries` | string | No | Comma-separated list of search queries to execute |
| `mode` | string | No | Search mode: one-shot, agentic, or fast \(default: one-shot\) |
| `max_results` | number | No | Maximum number of results to return \(default: 10\) |
| `max_chars_per_result` | number | No | Maximum characters per result excerpt \(minimum: 1000\) |
| `include_domains` | string | No | Comma-separated list of domains to restrict search results to |
| `exclude_domains` | string | No | Comma-separated list of domains to exclude from search results |
| `apiKey` | string | Yes | Parallel AI API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `search_id` | string | Unique identifier for this search request |
| `results` | array | Search results with excerpts from relevant pages |
| ↳ `url` | string | The URL of the search result |
| ↳ `title` | string | The title of the search result |
| ↳ `excerpts` | array | Text excerpts from the page |
| ↳ `publish_date` | string | Publication date of the page \(YYYY-MM-DD\) |
| ↳ `excerpts` | array | LLM-optimized excerpts from the page |
### `parallel_extract`
@@ -68,31 +72,33 @@ Extract targeted information from specific URLs using Parallel AI. Processes pro
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `urls` | string | Yes | Comma-separated list of URLs to extract information from |
| `objective` | string | Yes | What information to extract from the provided URLs |
| `excerpts` | boolean | Yes | Include relevant excerpts from the content |
| `full_content` | boolean | Yes | Include full page content |
| `objective` | string | No | What information to extract from the provided URLs |
| `excerpts` | boolean | No | Include relevant excerpts from the content \(default: true\) |
| `full_content` | boolean | No | Include full page content as markdown \(default: false\) |
| `apiKey` | string | Yes | Parallel AI API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `extract_id` | string | Unique identifier for this extraction request |
| `results` | array | Extracted information from the provided URLs |
| ↳ `url` | string | The source URL |
| ↳ `title` | string | The title of the page |
| ↳ `content` | string | Extracted content |
| ↳ `excerpts` | array | Relevant text excerpts |
| ↳ `publish_date` | string | Publication date \(YYYY-MM-DD\) |
| ↳ `excerpts` | array | Relevant text excerpts in markdown |
| ↳ `full_content` | string | Full page content as markdown |
### `parallel_deep_research`
Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 15 minutes to complete.
Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 45 minutes to complete.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `input` | string | Yes | Research query or question \(up to 15,000 characters\) |
| `processor` | string | No | Compute level: base, lite, pro, ultra, ultra2x, ultra4x, ultra8x \(default: base\) |
| `processor` | string | No | Processing tier: pro, ultra, pro-fast, ultra-fast \(default: pro\) |
| `include_domains` | string | No | Comma-separated list of domains to restrict research to \(source policy\) |
| `exclude_domains` | string | No | Comma-separated list of domains to exclude from research \(source policy\) |
| `apiKey` | string | Yes | Parallel AI API Key |
@@ -101,17 +107,17 @@ Conduct comprehensive deep research across the web using Parallel AI. Synthesize
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `status` | string | Task status \(completed, failed\) |
| `status` | string | Task status \(completed, failed, running\) |
| `run_id` | string | Unique ID for this research task |
| `message` | string | Status message |
| `content` | object | Research results \(structured based on output_schema\) |
| `basis` | array | Citations and sources with reasoning and confidence levels |
| ↳ `field` | string | Output field name |
| ↳ `field` | string | Output field dot-notation path |
| ↳ `reasoning` | string | Explanation for the result |
| ↳ `citations` | array | Array of sources |
| ↳ `url` | string | Source URL |
| ↳ `title` | string | Source title |
| ↳ `excerpts` | array | Relevant excerpts from the source |
| ↳ `confidence` | string | Confidence level indicator |
| ↳ `confidence` | string | Confidence level \(high, medium\) |

View File

@@ -19,7 +19,6 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import {
@@ -631,11 +630,9 @@ async function handleMessageStream(
}
const encoder = new TextEncoder()
let messageStreamDecremented = false
const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-message')
const sendEvent = (event: string, data: unknown) => {
try {
const jsonRpcResponse = {
@@ -845,19 +842,10 @@ async function handleMessageStream(
})
} finally {
await releaseLock(lockKey, lockValue)
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
controller.close()
}
},
cancel() {
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
},
cancel() {},
})
return new NextResponse(stream, {
@@ -1042,22 +1030,16 @@ async function handleTaskResubscribe(
{ once: true }
)
let sseDecremented = false
const cleanup = () => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('a2a-resubscribe')
}
}
const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-resubscribe')
const sendEvent = (event: string, data: unknown): boolean => {
if (isCancelled || abortSignal.aborted) return false
try {

View File

@@ -14,7 +14,6 @@ import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('McpEventsSSE')
@@ -50,14 +49,11 @@ export async function GET(request: NextRequest) {
for (const unsub of unsubscribers) {
unsub()
}
decrementSSEConnections('mcp-events')
logger.info(`SSE connection closed for workspace ${workspaceId}`)
}
const stream = new ReadableStream({
start(controller) {
incrementSSEConnections('mcp-events')
const send = (eventName: string, data: Record<string, unknown>) => {
if (cleaned) return
try {

View File

@@ -192,7 +192,8 @@ export const POST = withMcpAuth<{ id: string }>('read')(
)
} catch (error) {
connectionStatus = 'error'
lastError = error instanceof Error ? error.message : 'Connection test failed'
lastError =
error instanceof Error ? error.message.split('\n')[0].slice(0, 200) : 'Connection failed'
logger.warn(`[${requestId}] Failed to connect to server ${serverId}:`, error)
}

View File

@@ -41,6 +41,20 @@ interface TestConnectionResult {
warnings?: string[]
}
/**
* Extracts a user-friendly error message from connection errors.
* Keeps diagnostic info (timeout, DNS, HTTP status) but strips
* verbose internals (Zod details, full response bodies, stack traces).
*/
function sanitizeConnectionError(error: unknown): string {
if (!(error instanceof Error)) {
return 'Unknown connection error'
}
const firstLine = error.message.split('\n')[0]
return firstLine.length > 200 ? `${firstLine.slice(0, 200)}...` : firstLine
}
/**
* POST - Test connection to an MCP server before registering it
*/
@@ -137,8 +151,7 @@ export const POST = withMcpAuth('write')(
} catch (toolError) {
logger.warn(`[${requestId}] Connection established but could not list tools:`, toolError)
result.success = false
const errorMessage = toolError instanceof Error ? toolError.message : 'Unknown error'
result.error = `Connection established but could not list tools: ${errorMessage}`
result.error = 'Connection established but could not list tools'
result.warnings = result.warnings || []
result.warnings.push(
'Server connected but tool listing failed - connection may be incomplete'
@@ -163,11 +176,7 @@ export const POST = withMcpAuth('write')(
logger.warn(`[${requestId}] MCP server test failed:`, error)
result.success = false
if (error instanceof Error) {
result.error = error.message
} else {
result.error = 'Unknown connection error'
}
result.error = sanitizeConnectionError(error)
} finally {
if (client) {
try {

View File

@@ -89,11 +89,12 @@ export const POST = withMcpAuth('read')(
tool = tools.find((t) => t.name === toolName) ?? null
if (!tool) {
logger.warn(`[${requestId}] Tool ${toolName} not found on server ${serverId}`, {
availableTools: tools.map((t) => t.name),
})
return createMcpErrorResponse(
new Error(
`Tool ${toolName} not found on server ${serverId}. Available tools: ${tools.map((t) => t.name).join(', ')}`
),
'Tool not found',
new Error('Tool not found'),
'Tool not found on the specified server',
404
)
}

View File

@@ -76,7 +76,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to cancel task',
error: 'Failed to cancel task',
},
{ status: 500 }
)

View File

@@ -86,7 +86,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to delete push notification',
error: 'Failed to delete push notification',
},
{ status: 500 }
)

View File

@@ -84,7 +84,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to fetch Agent Card',
error: 'Failed to fetch Agent Card',
},
{ status: 500 }
)

View File

@@ -107,7 +107,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to get push notification',
error: 'Failed to get push notification',
},
{ status: 500 }
)

View File

@@ -87,7 +87,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to get task',
error: 'Failed to get task',
},
{ status: 500 }
)

View File

@@ -111,7 +111,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to resubscribe',
error: 'Failed to resubscribe',
},
{ status: 500 }
)

View File

@@ -70,7 +70,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: `Failed to connect to agent: ${clientError instanceof Error ? clientError.message : 'Unknown error'}`,
error: 'Failed to connect to agent',
},
{ status: 502 }
)
@@ -158,7 +158,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: `Failed to send message: ${sendError instanceof Error ? sendError.message : 'Unknown error'}`,
error: 'Failed to send message to agent',
},
{ status: 502 }
)
@@ -218,7 +218,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Internal server error',
error: 'Internal server error',
},
{ status: 500 }
)

View File

@@ -98,7 +98,7 @@ export async function POST(request: NextRequest) {
return NextResponse.json(
{
success: false,
error: error instanceof Error ? error.message : 'Failed to set push notification',
error: 'Failed to set push notification',
},
{ status: 500 }
)

View File

@@ -1,7 +1,13 @@
import { MongoClient } from 'mongodb'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
import type { MongoDBCollectionInfo, MongoDBConnectionConfig } from '@/tools/mongodb/types'
export async function createMongoDBConnection(config: MongoDBConnectionConfig) {
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
throw new Error(hostValidation.error)
}
const credentials =
config.username && config.password
? `${encodeURIComponent(config.username)}:${encodeURIComponent(config.password)}@`

View File

@@ -1,4 +1,5 @@
import mysql from 'mysql2/promise'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
export interface MySQLConnectionConfig {
host: string
@@ -10,6 +11,11 @@ export interface MySQLConnectionConfig {
}
export async function createMySQLConnection(config: MySQLConnectionConfig) {
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
throw new Error(hostValidation.error)
}
const connectionConfig: mysql.ConnectionOptions = {
host: config.host,
port: config.port,

View File

@@ -1,7 +1,13 @@
import neo4j from 'neo4j-driver'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
import type { Neo4jConnectionConfig } from '@/tools/neo4j/types'
export async function createNeo4jDriver(config: Neo4jConnectionConfig) {
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
throw new Error(hostValidation.error)
}
const isAuraHost =
config.host === 'databases.neo4j.io' || config.host.endsWith('.databases.neo4j.io')

View File

@@ -35,7 +35,7 @@ export async function POST(request: NextRequest) {
`[${requestId}] Deleting data from ${params.table} on ${params.host}:${params.port}/${params.database}`
)
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -47,7 +47,7 @@ export async function POST(request: NextRequest) {
)
}
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -57,7 +57,7 @@ export async function POST(request: NextRequest) {
`[${requestId}] Inserting data into ${params.table} on ${params.host}:${params.port}/${params.database}`
)
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -34,7 +34,7 @@ export async function POST(request: NextRequest) {
`[${requestId}] Introspecting PostgreSQL schema on ${params.host}:${params.port}/${params.database}`
)
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -34,7 +34,7 @@ export async function POST(request: NextRequest) {
`[${requestId}] Executing PostgreSQL query on ${params.host}:${params.port}/${params.database}`
)
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -54,7 +54,7 @@ export async function POST(request: NextRequest) {
`[${requestId}] Updating data in ${params.table} on ${params.host}:${params.port}/${params.database}`
)
const sql = createPostgresConnection({
const sql = await createPostgresConnection({
host: params.host,
port: params.port,
database: params.database,

View File

@@ -1,7 +1,13 @@
import postgres from 'postgres'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
import type { PostgresConnectionConfig } from '@/tools/postgresql/types'
export function createPostgresConnection(config: PostgresConnectionConfig) {
export async function createPostgresConnection(config: PostgresConnectionConfig) {
const hostValidation = await validateDatabaseHost(config.host, 'host')
if (!hostValidation.isValid) {
throw new Error(hostValidation.error)
}
const sslConfig =
config.ssl === 'disabled'
? false

View File

@@ -3,6 +3,7 @@ import Redis from 'ioredis'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkInternalAuth } from '@/lib/auth/hybrid'
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
const logger = createLogger('RedisAPI')
@@ -24,6 +25,16 @@ export async function POST(request: NextRequest) {
const body = await request.json()
const { url, command, args } = RequestSchema.parse(body)
const parsedUrl = new URL(url)
const hostname =
parsedUrl.hostname.startsWith('[') && parsedUrl.hostname.endsWith(']')
? parsedUrl.hostname.slice(1, -1)
: parsedUrl.hostname
const hostValidation = await validateDatabaseHost(hostname, 'host')
if (!hostValidation.isValid) {
return NextResponse.json({ error: hostValidation.error }, { status: 400 })
}
client = new Redis(url, {
connectTimeout: 10000,
commandTimeout: 10000,

View File

@@ -10,7 +10,6 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { enrichTableSchema } from '@/lib/table/llm/wand'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
@@ -331,14 +330,10 @@ export async function POST(req: NextRequest) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()
let wandStreamClosed = false
const readable = new ReadableStream({
async start(controller) {
incrementSSEConnections('wand')
const reader = response.body?.getReader()
if (!reader) {
wandStreamClosed = true
decrementSSEConnections('wand')
controller.close()
return
}
@@ -483,18 +478,9 @@ export async function POST(req: NextRequest) {
controller.close()
} finally {
reader.releaseLock()
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
}
},
cancel() {
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
},
cancel() {},
})
return new Response(readable, {

View File

@@ -324,7 +324,9 @@ vi.mock('@/lib/webhooks/processor', () => ({
return null
}
),
checkWebhookPreprocessing: vi.fn().mockResolvedValue(null),
checkWebhookPreprocessing: vi
.fn()
.mockResolvedValue({ error: null, actorUserId: 'test-user-id' }),
formatProviderErrorResponse: vi.fn().mockImplementation((_webhook, error, status) => {
const { NextResponse } = require('next/server')
return NextResponse.json({ error }, { status })

View File

@@ -4,7 +4,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
import {
checkWebhookPreprocessing,
findAllWebhooksForPath,
formatProviderErrorResponse,
handlePreDeploymentVerification,
handleProviderChallenges,
handleProviderReachabilityTest,
@@ -82,7 +81,6 @@ export async function POST(
requestId
)
if (authError) {
// For multi-webhook, log and continue to next webhook
if (webhooksForPath.length > 1) {
logger.warn(`[${requestId}] Auth failed for webhook ${foundWebhook.id}, continuing to next`)
continue
@@ -92,39 +90,18 @@ export async function POST(
const reachabilityResponse = handleProviderReachabilityTest(foundWebhook, body, requestId)
if (reachabilityResponse) {
// Reachability test should return immediately for the first webhook
return reachabilityResponse
}
let preprocessError: NextResponse | null = null
try {
preprocessError = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId)
if (preprocessError) {
if (webhooksForPath.length > 1) {
logger.warn(
`[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next`
)
continue
}
return preprocessError
}
} catch (error) {
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
})
const preprocessResult = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId)
if (preprocessResult.error) {
if (webhooksForPath.length > 1) {
logger.warn(
`[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next`
)
continue
}
return formatProviderErrorResponse(
foundWebhook,
'An unexpected error occurred during preprocessing',
500
)
return preprocessResult.error
}
if (foundWebhook.blockId) {
@@ -152,6 +129,7 @@ export async function POST(
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
requestId,
path,
actorUserId: preprocessResult.actorUserId,
})
responses.push(response)
}

View File

@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
@@ -764,7 +763,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let sseDecremented = false
const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
@@ -775,7 +773,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('workflow-execute')
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
const sendEvent = (event: ExecutionEvent) => {
@@ -1159,10 +1156,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (executionId) {
await cleanupExecutionBase64Cache(executionId)
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -1174,10 +1167,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client disconnected from SSE stream`)
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
},
})

View File

@@ -7,7 +7,6 @@ import {
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
@@ -74,10 +73,8 @@ export async function GET(
let closed = false
let sseDecremented = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('execution-stream-reconnect')
let lastEventId = fromEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
@@ -145,20 +142,11 @@ export async function GET(
controller.close()
} catch {}
}
} finally {
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from reconnection stream', { executionId })
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
},
})

View File

@@ -12,6 +12,7 @@ import {
} from '@/components/emails'
import { getSession } from '@/lib/auth'
import { decryptSecret } from '@/lib/core/security/encryption'
import { secureFetchWithValidation } from '@/lib/core/security/input-validation.server'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { sendEmail } from '@/lib/messaging/email/mailer'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
@@ -135,18 +136,18 @@ async function testWebhook(subscription: typeof workspaceNotificationSubscriptio
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 10000)
try {
const response = await fetch(webhookConfig.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const response = await secureFetchWithValidation(
webhookConfig.url,
{
method: 'POST',
headers,
body,
timeout: 10000,
allowHttp: true,
},
'webhookUrl'
)
const responseBody = await response.text().catch(() => '')
return {
@@ -157,12 +158,10 @@ async function testWebhook(subscription: typeof workspaceNotificationSubscriptio
timestamp: new Date().toISOString(),
}
} catch (error: unknown) {
clearTimeout(timeoutId)
const err = error as Error & { name?: string }
if (err.name === 'AbortError') {
return { success: false, error: 'Request timeout after 10 seconds' }
}
return { success: false, error: err.message }
logger.warn('Webhook test failed', {
error: error instanceof Error ? error.message : String(error),
})
return { success: false, error: 'Failed to deliver webhook' }
}
}
@@ -268,13 +267,15 @@ async function testSlack(
return {
success: result.ok,
error: result.error,
error: result.ok ? undefined : `Slack error: ${result.error || 'unknown'}`,
channel: result.channel,
timestamp: new Date().toISOString(),
}
} catch (error: unknown) {
const err = error as Error
return { success: false, error: err.message }
logger.warn('Slack test notification failed', {
error: error instanceof Error ? error.message : String(error),
})
return { success: false, error: 'Failed to send Slack notification' }
}
}

View File

@@ -1,18 +1,13 @@
import { db } from '@sim/db'
import { webhook, workflow as workflowTable } from '@sim/db/schema'
import { account, webhook } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { getHighestPrioritySubscription } from '@/lib/billing'
import {
createTimeoutAbortController,
getExecutionTimeout,
getTimeoutErrorMessage,
} from '@/lib/core/execution-limits'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { processExecutionFiles } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
@@ -20,7 +15,7 @@ import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webho
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
import { getBlock } from '@/blocks'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
@@ -109,8 +104,8 @@ export type WebhookExecutionPayload = {
headers: Record<string, string>
path: string
blockId?: string
workspaceId?: string
credentialId?: string
credentialAccountUserId?: string
}
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
@@ -143,6 +138,22 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
)
}
/**
* Resolve the account userId for a credential
*/
async function resolveCredentialAccountUserId(credentialId: string): Promise<string | undefined> {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
return undefined
}
const [credentialRecord] = await db
.select({ userId: account.userId })
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
return credentialRecord?.userId
}
async function executeWebhookJobInternal(
payload: WebhookExecutionPayload,
executionId: string,
@@ -155,17 +166,56 @@ async function executeWebhookJobInternal(
requestId
)
const userSubscription = await getHighestPrioritySubscription(payload.userId)
const asyncTimeout = getExecutionTimeout(
userSubscription?.plan as SubscriptionPlan | undefined,
'async'
)
// Resolve workflow record, billing actor, subscription, and timeout
const preprocessResult = await preprocessExecution({
workflowId: payload.workflowId,
userId: payload.userId,
triggerType: 'webhook',
executionId,
requestId,
checkRateLimit: false,
checkDeployment: false,
skipUsageLimits: true,
workspaceId: payload.workspaceId,
loggingSession,
})
if (!preprocessResult.success) {
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
}
const { workflowRecord, executionTimeout } = preprocessResult
if (!workflowRecord) {
throw new Error(`Workflow ${payload.workflowId} not found during preprocessing`)
}
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const workflowVariables = (workflowRecord.variables as Record<string, any>) || {}
const asyncTimeout = executionTimeout?.async ?? 120_000
const timeoutController = createTimeoutAbortController(asyncTimeout)
let deploymentVersionId: string | undefined
try {
const workflowData = await loadDeployedWorkflowState(payload.workflowId)
// Parallelize workflow state, webhook record, and credential resolution
const [workflowData, webhookRows, resolvedCredentialUserId] = await Promise.all([
loadDeployedWorkflowState(payload.workflowId, workspaceId),
db.select().from(webhook).where(eq(webhook.id, payload.webhookId)).limit(1),
payload.credentialId
? resolveCredentialAccountUserId(payload.credentialId)
: Promise.resolve(undefined),
])
const credentialAccountUserId = resolvedCredentialUserId
if (payload.credentialId && !credentialAccountUserId) {
logger.warn(
`[${requestId}] Failed to resolve credential account for credential ${payload.credentialId}`
)
}
if (!workflowData) {
throw new Error(
'Workflow state not found. The workflow may not be deployed or the deployment data may be corrupted.'
@@ -178,28 +228,11 @@ async function executeWebhookJobInternal(
? (workflowData.deploymentVersionId as string)
: undefined
const wfRows = await db
.select({ workspaceId: workflowTable.workspaceId, variables: workflowTable.variables })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
// Load the actual webhook record from database to get providerConfig
const [webhookRecord] = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
const webhookRecord = webhookRows[0]
if (!webhookRecord) {
throw new Error(`Webhook record not found: ${payload.webhookId}`)
}
@@ -210,29 +243,20 @@ async function executeWebhookJobInternal(
providerConfig: webhookRecord.providerConfig,
}
// Create a mock workflow object for Airtable processing
const mockWorkflow = {
id: payload.workflowId,
userId: payload.userId,
}
// Get the processed Airtable input
const airtableInput = await fetchAndProcessAirtablePayloads(
webhookData,
mockWorkflow,
requestId
)
// If we got input (changes), execute the workflow like other providers
if (airtableInput) {
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
@@ -240,13 +264,13 @@ async function executeWebhookJobInternal(
workspaceId,
userId: payload.userId,
sessionUserId: undefined,
workflowUserId: workflow.userId,
workflowUserId: workflowRecord.userId,
triggerType: payload.provider || 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
isClientSession: false,
credentialAccountUserId: payload.credentialAccountUserId,
credentialAccountUserId,
workflowStateOverride: {
blocks,
edges,
@@ -258,7 +282,7 @@ async function executeWebhookJobInternal(
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
workflowRecord,
airtableInput,
workflowVariables,
[]
@@ -329,7 +353,6 @@ async function executeWebhookJobInternal(
// No changes to process
logger.info(`[${requestId}] No Airtable changes to process`)
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId,
@@ -357,13 +380,6 @@ async function executeWebhookJobInternal(
}
// Format input for standard webhooks
// Load the actual webhook to get providerConfig (needed for Teams credentialId)
const webhookRows = await db
.select()
.from(webhook)
.where(eq(webhook.id, payload.webhookId))
.limit(1)
const actualWebhook =
webhookRows.length > 0
? webhookRows[0]
@@ -386,7 +402,6 @@ async function executeWebhookJobInternal(
if (!input && payload.provider === 'whatsapp') {
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId,
@@ -452,7 +467,6 @@ async function executeWebhookJobInternal(
}
} catch (error) {
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
// Continue without processing attachments rather than failing execution
}
}
@@ -499,18 +513,11 @@ async function executeWebhookJobInternal(
}
} catch (error) {
logger.error(`[${requestId}] Error processing generic webhook files:`, error)
// Continue without processing files rather than failing execution
}
}
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
// Get workflow for core execution
const workflow = await getWorkflowById(payload.workflowId)
if (!workflow) {
throw new Error(`Workflow ${payload.workflowId} not found`)
}
const metadata: ExecutionMetadata = {
requestId,
executionId,
@@ -518,13 +525,13 @@ async function executeWebhookJobInternal(
workspaceId,
userId: payload.userId,
sessionUserId: undefined,
workflowUserId: workflow.userId,
workflowUserId: workflowRecord.userId,
triggerType: payload.provider || 'webhook',
triggerBlockId: payload.blockId,
useDraftState: false,
startTime: new Date().toISOString(),
isClientSession: false,
credentialAccountUserId: payload.credentialAccountUserId,
credentialAccountUserId,
workflowStateOverride: {
blocks,
edges,
@@ -536,7 +543,13 @@ async function executeWebhookJobInternal(
const triggerInput = input || {}
const snapshot = new ExecutionSnapshot(metadata, workflow, triggerInput, workflowVariables, [])
const snapshot = new ExecutionSnapshot(
metadata,
workflowRecord,
triggerInput,
workflowVariables,
[]
)
const executionResult = await executeWorkflowCore({
snapshot,
@@ -611,23 +624,9 @@ async function executeWebhookJobInternal(
})
try {
const wfRow = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const errorWorkspaceId = wfRow[0]?.workspaceId
if (!errorWorkspaceId) {
logger.warn(
`[${requestId}] Cannot log error: workflow ${payload.workflowId} has no workspace`
)
throw error
}
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: errorWorkspaceId,
workspaceId,
variables: {},
triggerData: {
isTest: false,

View File

@@ -19,6 +19,7 @@ import { checkUsageStatus } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { RateLimiter } from '@/lib/core/rate-limiter'
import { decryptSecret } from '@/lib/core/security/encryption'
import { secureFetchWithValidation } from '@/lib/core/security/input-validation.server'
import { formatDuration } from '@/lib/core/utils/formatting'
import { getBaseUrl } from '@/lib/core/utils/urls'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
@@ -207,18 +208,18 @@ async function deliverWebhook(
headers['sim-signature'] = `t=${payload.timestamp},v1=${signature}`
}
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 30000)
try {
const response = await fetch(webhookConfig.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const response = await secureFetchWithValidation(
webhookConfig.url,
{
method: 'POST',
headers,
body,
timeout: 30000,
allowHttp: true,
},
'webhookUrl'
)
return {
success: response.ok,
@@ -226,11 +227,13 @@ async function deliverWebhook(
error: response.ok ? undefined : `HTTP ${response.status}`,
}
} catch (error: unknown) {
clearTimeout(timeoutId)
const err = error as Error & { name?: string }
logger.warn('Webhook delivery failed', {
error: error instanceof Error ? error.message : String(error),
webhookUrl: webhookConfig.url,
})
return {
success: false,
error: err.name === 'AbortError' ? 'Request timeout' : err.message,
error: 'Failed to deliver webhook',
}
}
}

View File

@@ -9,7 +9,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
authMode: AuthMode.ApiKey,
longDescription:
'Integrate Parallel AI into the workflow. Can search the web, extract information from URLs, and conduct deep research.',
docsLink: 'https://docs.parallel.ai/',
docsLink: 'https://docs.sim.ai/tools/parallel-ai',
category: 'tools',
bgColor: '#E0E0E0',
icon: ParallelIcon,
@@ -56,7 +56,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
title: 'Extract Objective',
type: 'long-input',
placeholder: 'What information to extract from the URLs?',
required: true,
required: false,
condition: { field: 'operation', value: 'extract' },
},
{
@@ -89,6 +89,37 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
required: true,
condition: { field: 'operation', value: 'deep_research' },
},
{
id: 'search_mode',
title: 'Search Mode',
type: 'dropdown',
options: [
{ label: 'One-Shot', id: 'one-shot' },
{ label: 'Agentic', id: 'agentic' },
{ label: 'Fast', id: 'fast' },
],
value: () => 'one-shot',
condition: { field: 'operation', value: 'search' },
mode: 'advanced',
},
{
id: 'search_include_domains',
title: 'Include Domains',
type: 'short-input',
placeholder: 'Comma-separated domains to include (e.g., .edu, example.com)',
required: false,
condition: { field: 'operation', value: 'search' },
mode: 'advanced',
},
{
id: 'search_exclude_domains',
title: 'Exclude Domains',
type: 'short-input',
placeholder: 'Comma-separated domains to exclude',
required: false,
condition: { field: 'operation', value: 'search' },
mode: 'advanced',
},
{
id: 'include_domains',
title: 'Include Domains',
@@ -96,6 +127,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
placeholder: 'Comma-separated domains to include',
required: false,
condition: { field: 'operation', value: 'deep_research' },
mode: 'advanced',
},
{
id: 'exclude_domains',
@@ -104,37 +136,37 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
placeholder: 'Comma-separated domains to exclude',
required: false,
condition: { field: 'operation', value: 'deep_research' },
mode: 'advanced',
},
{
id: 'processor',
title: 'Processor',
title: 'Research Processor',
type: 'dropdown',
options: [
{ label: 'Lite', id: 'lite' },
{ label: 'Base', id: 'base' },
{ label: 'Core', id: 'core' },
{ label: 'Core 2x', id: 'core2x' },
{ label: 'Pro', id: 'pro' },
{ label: 'Ultra', id: 'ultra' },
{ label: 'Ultra 2x', id: 'ultra2x' },
{ label: 'Ultra 4x', id: 'ultra4x' },
{ label: 'Pro Fast', id: 'pro-fast' },
{ label: 'Ultra Fast', id: 'ultra-fast' },
],
value: () => 'base',
condition: { field: 'operation', value: ['search', 'deep_research'] },
value: () => 'pro',
condition: { field: 'operation', value: 'deep_research' },
mode: 'advanced',
},
{
id: 'max_results',
title: 'Max Results',
type: 'short-input',
placeholder: '5',
placeholder: '10',
condition: { field: 'operation', value: 'search' },
mode: 'advanced',
},
{
id: 'max_chars_per_result',
title: 'Max Chars',
title: 'Max Chars Per Result',
type: 'short-input',
placeholder: '1500',
condition: { field: 'operation', value: 'search' },
mode: 'advanced',
},
{
id: 'apiKey',
@@ -149,8 +181,6 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
access: ['parallel_search', 'parallel_extract', 'parallel_deep_research'],
config: {
tool: (params) => {
if (params.extract_objective) params.objective = params.extract_objective
if (params.research_input) params.input = params.research_input
switch (params.operation) {
case 'search':
return 'parallel_search'
@@ -174,21 +204,30 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
.filter((query: string) => query.length > 0)
if (queries.length > 0) {
result.search_queries = queries
} else {
result.search_queries = undefined
}
}
if (params.search_mode && params.search_mode !== 'one-shot') {
result.mode = params.search_mode
}
if (params.max_results) result.max_results = Number(params.max_results)
if (params.max_chars_per_result) {
result.max_chars_per_result = Number(params.max_chars_per_result)
}
result.include_domains = params.search_include_domains || undefined
result.exclude_domains = params.search_exclude_domains || undefined
}
if (operation === 'extract') {
if (params.extract_objective) result.objective = params.extract_objective
result.excerpts = !(params.excerpts === 'false' || params.excerpts === false)
result.full_content = params.full_content === 'true' || params.full_content === true
}
if (operation === 'deep_research') {
if (params.research_input) result.input = params.research_input
if (params.processor) result.processor = params.processor
}
return result
},
},
@@ -202,29 +241,34 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
excerpts: { type: 'boolean', description: 'Include excerpts' },
full_content: { type: 'boolean', description: 'Include full content' },
research_input: { type: 'string', description: 'Deep research query' },
include_domains: { type: 'string', description: 'Domains to include' },
exclude_domains: { type: 'string', description: 'Domains to exclude' },
processor: { type: 'string', description: 'Processing method' },
include_domains: { type: 'string', description: 'Domains to include (deep research)' },
exclude_domains: { type: 'string', description: 'Domains to exclude (deep research)' },
search_include_domains: { type: 'string', description: 'Domains to include (search)' },
search_exclude_domains: { type: 'string', description: 'Domains to exclude (search)' },
search_mode: { type: 'string', description: 'Search mode (one-shot, agentic, fast)' },
processor: { type: 'string', description: 'Research processing tier' },
max_results: { type: 'number', description: 'Maximum number of results' },
max_chars_per_result: { type: 'number', description: 'Maximum characters per result' },
apiKey: { type: 'string', description: 'Parallel AI API key' },
},
outputs: {
results: { type: 'string', description: 'Search or extract results (JSON stringified)' },
results: {
type: 'json',
description: 'Search or extract results (array of url, title, excerpts)',
},
search_id: { type: 'string', description: 'Search request ID (for search)' },
extract_id: { type: 'string', description: 'Extract request ID (for extract)' },
status: { type: 'string', description: 'Task status (for deep research)' },
run_id: { type: 'string', description: 'Task run ID (for deep research)' },
message: { type: 'string', description: 'Status message (for deep research)' },
content: {
type: 'string',
description: 'Research content (for deep research, JSON stringified)',
type: 'json',
description: 'Research content (for deep research, structured based on output_schema)',
},
basis: {
type: 'string',
description: 'Citations and sources (for deep research, JSON stringified)',
},
metadata: {
type: 'string',
description: 'Task metadata (for deep research, JSON stringified)',
type: 'json',
description:
'Citations and sources with field, reasoning, citations, confidence (for deep research)',
},
},
}

View File

@@ -7,6 +7,7 @@ import {
ClientFactoryOptions,
} from '@a2a-js/sdk/client'
import { createLogger } from '@sim/logger'
import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
import { isInternalFileUrl } from '@/lib/uploads/utils/file-utils'
import { A2A_TERMINAL_STATES } from './constants'
@@ -43,6 +44,11 @@ class ApiKeyInterceptor implements CallInterceptor {
* Tries standard path first, falls back to root URL for compatibility.
*/
export async function createA2AClient(agentUrl: string, apiKey?: string): Promise<Client> {
const validation = await validateUrlWithDNS(agentUrl, 'agentUrl')
if (!validation.isValid) {
throw new Error(validation.error || 'Agent URL validation failed')
}
const factoryOptions = apiKey
? ClientFactoryOptions.createFrom(ClientFactoryOptions.default, {
clientConfig: {

View File

@@ -8,6 +8,7 @@ import {
isLegacyApiKeyFormat,
} from '@/lib/api-key/crypto'
import { env } from '@/lib/core/config/env'
import { safeCompare } from '@/lib/core/security/encryption'
const logger = createLogger('ApiKeyAuth')
@@ -39,7 +40,7 @@ export async function authenticateApiKey(inputKey: string, storedKey: string): P
if (isEncryptedKey(storedKey)) {
try {
const { decrypted } = await decryptApiKey(storedKey)
return inputKey === decrypted
return safeCompare(inputKey, decrypted)
} catch (decryptError) {
logger.error('Failed to decrypt stored API key:', { error: decryptError })
return false
@@ -54,27 +55,27 @@ export async function authenticateApiKey(inputKey: string, storedKey: string): P
if (isEncryptedKey(storedKey)) {
try {
const { decrypted } = await decryptApiKey(storedKey)
return inputKey === decrypted
return safeCompare(inputKey, decrypted)
} catch (decryptError) {
logger.error('Failed to decrypt stored API key:', { error: decryptError })
// Fall through to plain text comparison if decryption fails
}
}
// Legacy format can match against plain text storage
return inputKey === storedKey
return safeCompare(inputKey, storedKey)
}
// If no recognized prefix, fall back to original behavior
if (isEncryptedKey(storedKey)) {
try {
const { decrypted } = await decryptApiKey(storedKey)
return inputKey === decrypted
return safeCompare(inputKey, decrypted)
} catch (decryptError) {
logger.error('Failed to decrypt stored API key:', { error: decryptError })
}
}
return inputKey === storedKey
return safeCompare(inputKey, storedKey)
} catch (error) {
logger.error('API key authentication error:', { error })
return false

View File

@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
import { jwtVerify, SignJWT } from 'jose'
import { type NextRequest, NextResponse } from 'next/server'
import { env } from '@/lib/core/config/env'
import { safeCompare } from '@/lib/core/security/encryption'
const logger = createLogger('CronAuth')
@@ -81,7 +82,8 @@ export function verifyCronAuth(request: NextRequest, context?: string): NextResp
const authHeader = request.headers.get('authorization')
const expectedAuth = `Bearer ${env.CRON_SECRET}`
if (authHeader !== expectedAuth) {
const isValid = authHeader !== null && safeCompare(authHeader, expectedAuth)
if (!isValid) {
const contextInfo = context ? ` for ${context}` : ''
logger.warn(`Unauthorized CRON access attempt${contextInfo}`, {
providedAuth: authHeader,

View File

@@ -1,5 +1,6 @@
import type { NextRequest } from 'next/server'
import { env } from '@/lib/core/config/env'
import { safeCompare } from '@/lib/core/security/encryption'
export function checkInternalApiKey(req: NextRequest) {
const apiKey = req.headers.get('x-api-key')
@@ -13,7 +14,7 @@ export function checkInternalApiKey(req: NextRequest) {
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
if (!safeCompare(apiKey, expectedApiKey)) {
return { success: false, error: 'Invalid API key' }
}

View File

@@ -81,7 +81,9 @@ export function setDeploymentAuthCookie(
}
/**
* Adds CORS headers to allow cross-origin requests for embedded deployments
* Adds CORS headers to allow cross-origin requests for embedded deployments.
* Embedded chat widgets and forms are designed to run on any customer domain,
* so we reflect the requesting origin rather than restricting to an allowlist.
*/
export function addCorsHeaders(response: NextResponse, request: NextRequest): NextResponse {
const origin = request.headers.get('origin') || ''

View File

@@ -1,4 +1,4 @@
import { createCipheriv, createDecipheriv, randomBytes, timingSafeEqual } from 'crypto'
import { createCipheriv, createDecipheriv, createHmac, randomBytes, timingSafeEqual } from 'crypto'
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
@@ -91,8 +91,8 @@ export function generatePassword(length = 24): string {
* @returns True if strings are equal, false otherwise
*/
export function safeCompare(a: string, b: string): boolean {
if (a.length !== b.length) {
return false
}
return timingSafeEqual(Buffer.from(a), Buffer.from(b))
const key = 'safeCompare'
const ha = createHmac('sha256', key).update(a).digest()
const hb = createHmac('sha256', key).update(b).digest()
return timingSafeEqual(ha, hb)
}

View File

@@ -54,9 +54,10 @@ function isPrivateOrReservedIP(ip: string): boolean {
*/
export async function validateUrlWithDNS(
url: string | null | undefined,
paramName = 'url'
paramName = 'url',
options: { allowHttp?: boolean } = {}
): Promise<AsyncValidationResult> {
const basicValidation = validateExternalUrl(url, paramName)
const basicValidation = validateExternalUrl(url, paramName, options)
if (!basicValidation.isValid) {
return basicValidation
}
@@ -88,7 +89,10 @@ export async function validateUrlWithDNS(
return ip === '127.0.0.1' || ip === '::1'
})()
if (isPrivateOrReservedIP(address) && !(isLocalhost && resolvedIsLoopback)) {
if (
isPrivateOrReservedIP(address) &&
!(isLocalhost && resolvedIsLoopback && !options.allowHttp)
) {
logger.warn('URL resolves to blocked IP address', {
paramName,
hostname,
@@ -118,6 +122,70 @@ export async function validateUrlWithDNS(
}
}
/**
* Validates a database hostname by resolving DNS and checking the resolved IP
* against private/reserved ranges to prevent SSRF via database connections.
*
* Unlike validateHostname (which enforces strict RFC hostname format), this
* function is permissive about hostname format to avoid breaking legitimate
* database hostnames (e.g. underscores in Docker/K8s service names). It only
* blocks localhost and private/reserved IPs.
*
* @param host - The database hostname to validate
* @param paramName - Name of the parameter for error messages
* @returns AsyncValidationResult with resolved IP
*/
export async function validateDatabaseHost(
host: string | null | undefined,
paramName = 'host'
): Promise<AsyncValidationResult> {
if (!host) {
return { isValid: false, error: `${paramName} is required` }
}
const lowerHost = host.toLowerCase()
if (lowerHost === 'localhost') {
return { isValid: false, error: `${paramName} cannot be localhost` }
}
if (ipaddr.isValid(lowerHost) && isPrivateOrReservedIP(lowerHost)) {
return { isValid: false, error: `${paramName} cannot be a private IP address` }
}
try {
const { address } = await dns.lookup(host, { verbatim: true })
if (isPrivateOrReservedIP(address)) {
logger.warn('Database host resolves to blocked IP address', {
paramName,
hostname: host,
resolvedIP: address,
})
return {
isValid: false,
error: `${paramName} resolves to a blocked IP address`,
}
}
return {
isValid: true,
resolvedIP: address,
originalHostname: host,
}
} catch (error) {
logger.warn('DNS lookup failed for database host', {
paramName,
hostname: host,
error: error instanceof Error ? error.message : String(error),
})
return {
isValid: false,
error: `${paramName} hostname could not be resolved`,
}
}
}
export interface SecureFetchOptions {
method?: string
headers?: Record<string, string>
@@ -183,7 +251,7 @@ function resolveRedirectUrl(baseUrl: string, location: string): string {
export async function secureFetchWithPinnedIP(
url: string,
resolvedIP: string,
options: SecureFetchOptions = {},
options: SecureFetchOptions & { allowHttp?: boolean } = {},
redirectCount = 0
): Promise<SecureFetchResponse> {
const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
@@ -231,7 +299,7 @@ export async function secureFetchWithPinnedIP(
res.resume()
const redirectUrl = resolveRedirectUrl(url, location)
validateUrlWithDNS(redirectUrl, 'redirectUrl')
validateUrlWithDNS(redirectUrl, 'redirectUrl', { allowHttp: options.allowHttp })
.then((validation) => {
if (!validation.isValid) {
reject(new Error(`Redirect blocked: ${validation.error}`))
@@ -340,10 +408,12 @@ export async function secureFetchWithPinnedIP(
*/
export async function secureFetchWithValidation(
url: string,
options: SecureFetchOptions = {},
options: SecureFetchOptions & { allowHttp?: boolean } = {},
paramName = 'url'
): Promise<SecureFetchResponse> {
const validation = await validateUrlWithDNS(url, paramName)
const validation = await validateUrlWithDNS(url, paramName, {
allowHttp: options.allowHttp,
})
if (!validation.isValid) {
throw new Error(validation.error)
}

View File

@@ -676,7 +676,8 @@ export function validateJiraIssueKey(
*/
export function validateExternalUrl(
url: string | null | undefined,
paramName = 'url'
paramName = 'url',
options: { allowHttp?: boolean } = {}
): ValidationResult {
if (!url || typeof url !== 'string') {
return {
@@ -709,7 +710,20 @@ export function validateExternalUrl(
}
}
if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
if (options.allowHttp) {
if (protocol !== 'https:' && protocol !== 'http:') {
return {
isValid: false,
error: `${paramName} must use http:// or https:// protocol`,
}
}
if (isLocalhost) {
return {
isValid: false,
error: `${paramName} cannot point to localhost`,
}
}
} else if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
return {
isValid: false,
error: `${paramName} must use https:// protocol`,

View File

@@ -246,7 +246,7 @@ describe('categorizeError', () => {
const error = new Error('Server not accessible')
const result = categorizeError(error)
expect(result.status).toBe(404)
expect(result.message).toBe('Server not accessible')
expect(result.message).toBe('Resource not found')
})
it.concurrent('returns 401 for authentication errors', () => {
@@ -267,28 +267,28 @@ describe('categorizeError', () => {
const error = new Error('Invalid parameter provided')
const result = categorizeError(error)
expect(result.status).toBe(400)
expect(result.message).toBe('Invalid parameter provided')
expect(result.message).toBe('Invalid request parameters')
})
it.concurrent('returns 400 for missing required errors', () => {
const error = new Error('Missing required field: name')
const result = categorizeError(error)
expect(result.status).toBe(400)
expect(result.message).toBe('Missing required field: name')
expect(result.message).toBe('Invalid request parameters')
})
it.concurrent('returns 400 for validation errors', () => {
const error = new Error('Validation failed for input')
const result = categorizeError(error)
expect(result.status).toBe(400)
expect(result.message).toBe('Validation failed for input')
expect(result.message).toBe('Invalid request parameters')
})
it.concurrent('returns 500 for generic errors', () => {
const error = new Error('Something went wrong')
const result = categorizeError(error)
expect(result.status).toBe(500)
expect(result.message).toBe('Something went wrong')
expect(result.message).toBe('Internal server error')
})
it.concurrent('returns 500 for non-Error objects', () => {

View File

@@ -49,18 +49,18 @@ export const MCP_CLIENT_CONSTANTS = {
} as const
/**
* Create standardized MCP error response
* Create standardized MCP error response.
* Always returns the defaultMessage to clients to prevent leaking internal error details.
* Callers are responsible for logging the original error before calling this function.
*/
export function createMcpErrorResponse(
error: unknown,
_error: unknown,
defaultMessage: string,
status = 500
): NextResponse {
const errorMessage = error instanceof Error ? error.message : defaultMessage
const response: McpApiResponse = {
success: false,
error: errorMessage,
error: defaultMessage,
}
return NextResponse.json(response, { status })
@@ -115,36 +115,33 @@ export function validateRequiredFields(
}
/**
* Enhanced error categorization for more specific HTTP status codes
* Enhanced error categorization for more specific HTTP status codes.
* Returns safe, generic messages to prevent leaking internal details.
*/
export function categorizeError(error: unknown): { message: string; status: number } {
if (!(error instanceof Error)) {
return { message: 'Unknown error occurred', status: 500 }
}
const message = error.message.toLowerCase()
const msg = error.message.toLowerCase()
if (message.includes('timeout')) {
if (msg.includes('timeout')) {
return { message: 'Request timed out', status: 408 }
}
if (message.includes('not found') || message.includes('not accessible')) {
return { message: error.message, status: 404 }
if (msg.includes('not found') || msg.includes('not accessible')) {
return { message: 'Resource not found', status: 404 }
}
if (message.includes('authentication') || message.includes('unauthorized')) {
if (msg.includes('authentication') || msg.includes('unauthorized')) {
return { message: 'Authentication required', status: 401 }
}
if (
message.includes('invalid') ||
message.includes('missing required') ||
message.includes('validation')
) {
return { message: error.message, status: 400 }
if (msg.includes('invalid') || msg.includes('missing required') || msg.includes('validation')) {
return { message: 'Invalid request parameters', status: 400 }
}
return { message: error.message, status: 500 }
return { message: 'Internal server error', status: 500 }
}
/**

View File

@@ -1,16 +1,10 @@
/**
* Periodic memory telemetry for diagnosing heap growth in production.
* Logs process.memoryUsage(), V8 heap stats, and active SSE connection
* counts every 60s, enabling correlation between connection leaks and
* memory spikes.
* Periodic memory telemetry for monitoring heap growth in production.
* Logs process.memoryUsage() and V8 heap stats every 60s.
*/
import v8 from 'node:v8'
import { createLogger } from '@sim/logger'
import {
getActiveSSEConnectionCount,
getActiveSSEConnectionsByRoute,
} from '@/lib/monitoring/sse-connections'
const logger = createLogger('MemoryTelemetry', { logLevel: 'INFO' })
@@ -23,16 +17,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
started = true
const timer = setInterval(() => {
// Trigger opportunistic (non-blocking) garbage collection if running on Bun.
// This signals JSC GC + mimalloc page purge without blocking the event loop,
// helping reclaim RSS that mimalloc otherwise retains under sustained load.
const bunGlobal = (globalThis as Record<string, unknown>).Bun as
| { gc?: (force: boolean) => void }
| undefined
if (typeof bunGlobal?.gc === 'function') {
bunGlobal.gc(false)
}
const mem = process.memoryUsage()
const heap = v8.getHeapStatistics()
@@ -49,8 +33,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
? process.getActiveResourcesInfo().length
: -1,
uptimeMin: Math.round(process.uptime() / 60),
activeSSEConnections: getActiveSSEConnectionCount(),
sseByRoute: getActiveSSEConnectionsByRoute(),
})
}, intervalMs)
timer.unref()

View File

@@ -1,27 +0,0 @@
/**
* Tracks active SSE connections by route for memory leak diagnostics.
* Logged alongside periodic memory telemetry to correlate connection
* counts with heap growth.
*/
const connections = new Map<string, number>()
export function incrementSSEConnections(route: string) {
connections.set(route, (connections.get(route) ?? 0) + 1)
}
export function decrementSSEConnections(route: string) {
const count = (connections.get(route) ?? 0) - 1
if (count <= 0) connections.delete(route)
else connections.set(route, count)
}
export function getActiveSSEConnectionCount(): number {
let total = 0
for (const count of connections.values()) total += count
return total
}
export function getActiveSSEConnectionsByRoute(): Record<string, number> {
return Object.fromEntries(connections)
}

View File

@@ -1,5 +1,5 @@
import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db'
import { account, credentialSet, subscription } from '@sim/db/schema'
import { credentialSet, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
@@ -7,6 +7,7 @@ import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { isProd } from '@/lib/core/config/feature-flags'
import { safeCompare } from '@/lib/core/security/encryption'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
@@ -25,8 +26,6 @@ import {
validateTypeformSignature,
verifyProviderWebhook,
} from '@/lib/webhooks/utils.server'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
import { executeWebhookJob } from '@/background/webhook-execution'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
@@ -40,6 +39,12 @@ export interface WebhookProcessorOptions {
requestId: string
path?: string
webhookId?: string
actorUserId?: string
}
export interface WebhookPreprocessingResult {
error: NextResponse | null
actorUserId?: string
}
function getExternalUrl(request: NextRequest): string {
@@ -800,14 +805,14 @@ export async function verifyProviderAuth(
if (secretHeaderName) {
const headerValue = request.headers.get(secretHeaderName.toLowerCase())
if (headerValue === configToken) {
if (headerValue && safeCompare(headerValue, configToken)) {
isTokenValid = true
}
} else {
const authHeader = request.headers.get('authorization')
if (authHeader?.toLowerCase().startsWith('bearer ')) {
const token = authHeader.substring(7)
if (token === configToken) {
if (safeCompare(token, configToken)) {
isTokenValid = true
}
}
@@ -835,7 +840,7 @@ export async function checkWebhookPreprocessing(
foundWorkflow: any,
foundWebhook: any,
requestId: string
): Promise<NextResponse | null> {
): Promise<WebhookPreprocessingResult> {
try {
const executionId = uuidv4()
@@ -848,6 +853,7 @@ export async function checkWebhookPreprocessing(
checkRateLimit: true,
checkDeployment: true,
workspaceId: foundWorkflow.workspaceId,
workflowRecord: foundWorkflow,
})
if (!preprocessResult.success) {
@@ -859,33 +865,39 @@ export async function checkWebhookPreprocessing(
})
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: error.message,
},
{ status: error.statusCode }
)
return {
error: NextResponse.json(
{
type: 'message',
text: error.message,
},
{ status: error.statusCode }
),
}
}
return NextResponse.json({ error: error.message }, { status: error.statusCode })
return { error: NextResponse.json({ error: error.message }, { status: error.statusCode }) }
}
return null
return { error: null, actorUserId: preprocessResult.actorUserId }
} catch (preprocessError) {
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
if (foundWebhook.provider === 'microsoft-teams') {
return NextResponse.json(
{
type: 'message',
text: 'Internal error during preprocessing',
},
{ status: 500 }
)
return {
error: NextResponse.json(
{
type: 'message',
text: 'Internal error during preprocessing',
},
{ status: 500 }
),
}
}
return NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 })
return {
error: NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 }),
}
}
}
@@ -1059,22 +1071,7 @@ export async function queueWebhookExecution(
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
const credentialId = providerConfig.credentialId as string | undefined
let credentialAccountUserId: string | undefined
if (credentialId) {
const resolved = await resolveOAuthAccountId(credentialId)
if (!resolved) {
logger.error(
`[${options.requestId}] Failed to resolve OAuth account for credential ${credentialId}`
)
return formatProviderErrorResponse(foundWebhook, 'Failed to resolve credential', 500)
}
const [credentialRecord] = await db
.select({ userId: account.userId })
.from(account)
.where(eq(account.id, resolved.accountId))
.limit(1)
credentialAccountUserId = credentialRecord?.userId
}
// credentialSetId is a direct field on webhook table, not in providerConfig
const credentialSetId = foundWebhook.credentialSetId as string | undefined
@@ -1089,16 +1086,9 @@ export async function queueWebhookExecution(
}
}
if (!foundWorkflow.workspaceId) {
logger.error(`[${options.requestId}] Workflow ${foundWorkflow.id} has no workspaceId`)
return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 })
}
const actorUserId = await getWorkspaceBilledAccountUserId(foundWorkflow.workspaceId)
const actorUserId = options.actorUserId
if (!actorUserId) {
logger.error(
`[${options.requestId}] No billing account for workspace ${foundWorkflow.workspaceId}`
)
logger.error(`[${options.requestId}] No actorUserId provided for webhook ${foundWebhook.id}`)
return NextResponse.json({ error: 'Unable to resolve billing account' }, { status: 500 })
}
@@ -1111,8 +1101,8 @@ export async function queueWebhookExecution(
headers,
path: options.path || foundWebhook.path,
blockId: foundWebhook.blockId,
workspaceId: foundWorkflow.workspaceId,
...(credentialId ? { credentialId } : {}),
...(credentialAccountUserId ? { credentialAccountUserId } : {}),
}
const jobQueue = await getJobQueue()
@@ -1166,6 +1156,12 @@ export async function queueWebhookExecution(
})
}
// Slack requires an empty 200 for interactive payloads (view_submission, block_actions, etc.)
// A JSON body like {"message":"..."} is not a recognized response format and causes modal errors
if (foundWebhook.provider === 'slack') {
return new NextResponse(null, { status: 200 })
}
// Twilio Voice requires TwiML XML response
if (foundWebhook.provider === 'twilio_voice') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
@@ -1211,6 +1207,12 @@ export async function queueWebhookExecution(
)
}
if (foundWebhook.provider === 'slack') {
// Return empty 200 to avoid Slack showing an error dialog to the user,
// even though processing failed. The error is already logged above.
return new NextResponse(null, { status: 200 })
}
if (foundWebhook.provider === 'twilio_voice') {
const errorTwiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>

View File

@@ -1,5 +1,6 @@
import type { IncomingMessage, ServerResponse } from 'http'
import { env } from '@/lib/core/config/env'
import { safeCompare } from '@/lib/core/security/encryption'
import type { IRoomManager } from '@/socket/rooms'
interface Logger {
@@ -21,7 +22,8 @@ function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?:
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
const apiKeyStr = Array.isArray(apiKey) ? apiKey[0] : apiKey
if (!apiKeyStr || !safeCompare(apiKeyStr, expectedApiKey)) {
return { success: false, error: 'Invalid API key' }
}

View File

@@ -8,7 +8,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
id: 'parallel_deep_research',
name: 'Parallel AI Deep Research',
description:
'Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 15 minutes to complete.',
'Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 45 minutes to complete.',
version: '1.0.0',
params: {
@@ -22,8 +22,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
type: 'string',
required: false,
visibility: 'user-only',
description:
'Compute level: base, lite, pro, ultra, ultra2x, ultra4x, ultra8x (default: base)',
description: 'Processing tier: pro, ultra, pro-fast, ultra-fast (default: pro)',
},
include_domains: {
type: 'string',
@@ -55,15 +54,12 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
body: (params) => {
const body: Record<string, unknown> = {
input: params.input,
processor: params.processor || 'base',
processor: params.processor || 'pro',
task_spec: {
output_schema: 'auto',
},
}
const taskSpec: Record<string, unknown> = {}
taskSpec.output_schema = 'auto'
body.task_spec = taskSpec
if (params.include_domains || params.exclude_domains) {
const sourcePolicy: Record<string, string[]> = {}
@@ -91,14 +87,21 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
},
transformResponse: async (response: Response) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(
`Parallel AI deep research task creation failed: ${response.status} - ${errorText}`
)
}
const data = await response.json()
return {
success: true,
output: {
run_id: data.run_id,
status: data.status,
message: `Research task ${data.status}, waiting for completion...`,
run_id: data.run_id ?? null,
status: data.status ?? null,
message: `Research task ${data.status ?? 'created'}, waiting for completion...`,
content: {},
basis: [],
},
@@ -122,13 +125,16 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
logger.info(`Parallel AI deep research task ${runId} created, fetching results...`)
try {
const resultResponse = await fetch(`https://api.parallel.ai/v1/tasks/runs/${runId}/result`, {
method: 'GET',
headers: {
'x-api-key': params.apiKey,
'Content-Type': 'application/json',
},
})
const resultResponse = await fetch(
`https://api.parallel.ai/v1/tasks/runs/${String(runId).trim()}/result`,
{
method: 'GET',
headers: {
'x-api-key': params.apiKey,
'Content-Type': 'application/json',
},
}
)
if (!resultResponse.ok) {
const errorText = await resultResponse.text()
@@ -138,17 +144,17 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
const taskResult = await resultResponse.json()
logger.info(`Parallel AI deep research task ${runId} completed`)
const output = taskResult.output || {}
const run = taskResult.run || {}
const output = taskResult.output ?? {}
const status = taskResult.status ?? 'completed'
return {
success: true,
output: {
status: run.status || 'completed',
status,
run_id: runId,
message: 'Research completed successfully',
content: output.content || {},
basis: output.basis || [],
content: output.content ?? {},
basis: output.basis ?? [],
},
}
} catch (error: unknown) {
@@ -169,7 +175,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
outputs: {
status: {
type: 'string',
description: 'Task status (completed, failed)',
description: 'Task status (completed, failed, running)',
},
run_id: {
type: 'string',
@@ -189,7 +195,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
items: {
type: 'object',
properties: {
field: { type: 'string', description: 'Output field name' },
field: { type: 'string', description: 'Output field dot-notation path' },
reasoning: { type: 'string', description: 'Explanation for the result' },
citations: {
type: 'array',
@@ -203,7 +209,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
},
},
},
confidence: { type: 'string', description: 'Confidence level indicator' },
confidence: { type: 'string', description: 'Confidence level (high, medium)' },
},
},
},

View File

@@ -17,21 +17,21 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
},
objective: {
type: 'string',
required: true,
required: false,
visibility: 'user-or-llm',
description: 'What information to extract from the provided URLs',
},
excerpts: {
type: 'boolean',
required: true,
visibility: 'user-only',
description: 'Include relevant excerpts from the content',
required: false,
visibility: 'user-or-llm',
description: 'Include relevant excerpts from the content (default: true)',
},
full_content: {
type: 'boolean',
required: true,
visibility: 'user-only',
description: 'Include full page content',
required: false,
visibility: 'user-or-llm',
description: 'Include full page content as markdown (default: false)',
},
apiKey: {
type: 'string',
@@ -50,7 +50,6 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
'parallel-beta': 'search-extract-2025-10-10',
}),
body: (params) => {
// Convert comma-separated URLs to array
const urlArray = params.urls
.split(',')
.map((url) => url.trim())
@@ -58,10 +57,9 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
const body: Record<string, unknown> = {
urls: urlArray,
objective: params.objective,
}
// Add optional parameters if provided
if (params.objective) body.objective = params.objective
if (params.excerpts !== undefined) body.excerpts = params.excerpts
if (params.full_content !== undefined) body.full_content = params.full_content
@@ -70,17 +68,44 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
},
transformResponse: async (response: Response) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(`Parallel AI extract failed: ${response.status} - ${errorText}`)
}
const data = await response.json()
if (!data.results) {
return {
success: false,
error: 'No results returned from extraction',
output: {
results: [],
extract_id: data.extract_id ?? null,
},
}
}
return {
success: true,
output: {
results: data.results || [],
extract_id: data.extract_id ?? null,
results: data.results.map((result: Record<string, unknown>) => ({
url: result.url ?? null,
title: result.title ?? null,
publish_date: result.publish_date ?? null,
excerpts: result.excerpts ?? [],
full_content: result.full_content ?? null,
})),
},
}
},
outputs: {
extract_id: {
type: 'string',
description: 'Unique identifier for this extraction request',
},
results: {
type: 'array',
description: 'Extracted information from the provided URLs',
@@ -88,12 +113,22 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
type: 'object',
properties: {
url: { type: 'string', description: 'The source URL' },
title: { type: 'string', description: 'The title of the page' },
content: { type: 'string', description: 'Extracted content' },
title: { type: 'string', description: 'The title of the page', optional: true },
publish_date: {
type: 'string',
description: 'Publication date (YYYY-MM-DD)',
optional: true,
},
excerpts: {
type: 'array',
description: 'Relevant text excerpts',
description: 'Relevant text excerpts in markdown',
items: { type: 'string' },
optional: true,
},
full_content: {
type: 'string',
description: 'Full page content as markdown',
optional: true,
},
},
},

View File

@@ -5,3 +5,5 @@ import { searchTool } from '@/tools/parallel/search'
export const parallelSearchTool = searchTool
export const parallelExtractTool = extractTool
export const parallelDeepResearchTool = deepResearchTool
export * from './types'

View File

@@ -19,25 +19,37 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Optional comma-separated list of search queries to execute',
description: 'Comma-separated list of search queries to execute',
},
processor: {
mode: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Processing method: base or pro (default: base)',
description: 'Search mode: one-shot, agentic, or fast (default: one-shot)',
},
max_results: {
type: 'number',
required: false,
visibility: 'user-only',
description: 'Maximum number of results to return (default: 5)',
description: 'Maximum number of results to return (default: 10)',
},
max_chars_per_result: {
type: 'number',
required: false,
visibility: 'user-only',
description: 'Maximum characters per result (default: 1500)',
description: 'Maximum characters per result excerpt (minimum: 1000)',
},
include_domains: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Comma-separated list of domains to restrict search results to',
},
exclude_domains: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Comma-separated list of domains to exclude from search results',
},
apiKey: {
type: 'string',
@@ -60,44 +72,83 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
objective: params.objective,
}
// Only include search_queries if it's not empty
if (
params.search_queries !== undefined &&
params.search_queries !== null &&
params.search_queries.length > 0
) {
body.search_queries = params.search_queries
if (params.search_queries) {
if (Array.isArray(params.search_queries)) {
body.search_queries = params.search_queries
} else if (typeof params.search_queries === 'string') {
const queries = params.search_queries
.split(',')
.map((q: string) => q.trim())
.filter((q: string) => q.length > 0)
if (queries.length > 0) body.search_queries = queries
}
}
// Add optional parameters if provided
if (params.processor) body.processor = params.processor
if (params.mode) body.mode = params.mode
if (params.max_results) body.max_results = Number(params.max_results)
if (params.max_chars_per_result)
body.max_chars_per_result = Number(params.max_chars_per_result)
if (params.max_chars_per_result) {
body.excerpts = { max_chars_per_result: Number(params.max_chars_per_result) }
}
const sourcePolicy: Record<string, string[]> = {}
if (params.include_domains) {
sourcePolicy.include_domains = params.include_domains
.split(',')
.map((d: string) => d.trim())
.filter((d: string) => d.length > 0)
}
if (params.exclude_domains) {
sourcePolicy.exclude_domains = params.exclude_domains
.split(',')
.map((d: string) => d.trim())
.filter((d: string) => d.length > 0)
}
if (Object.keys(sourcePolicy).length > 0) {
body.source_policy = sourcePolicy
}
return body
},
},
transformResponse: async (response: Response) => {
if (!response.ok) {
const errorText = await response.text()
throw new Error(`Parallel AI search failed: ${response.status} - ${errorText}`)
}
const data = await response.json()
if (!data.results) {
return {
success: false,
error: 'No results returned from search',
output: {
results: [],
search_id: data.search_id ?? null,
},
}
}
return {
success: true,
output: {
results: data.results.map((result: unknown) => {
const resultObj = result as Record<string, unknown>
return {
url: resultObj.url || '',
title: resultObj.title || '',
excerpts: resultObj.excerpts || [],
}
}),
search_id: data.search_id ?? null,
results: data.results.map((result: Record<string, unknown>) => ({
url: result.url ?? null,
title: result.title ?? null,
publish_date: result.publish_date ?? null,
excerpts: result.excerpts ?? [],
})),
},
}
},
outputs: {
search_id: {
type: 'string',
description: 'Unique identifier for this search request',
},
results: {
type: 'array',
description: 'Search results with excerpts from relevant pages',
@@ -106,9 +157,14 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
properties: {
url: { type: 'string', description: 'The URL of the search result' },
title: { type: 'string', description: 'The title of the search result' },
publish_date: {
type: 'string',
description: 'Publication date of the page (YYYY-MM-DD)',
optional: true,
},
excerpts: {
type: 'array',
description: 'Text excerpts from the page',
description: 'LLM-optimized excerpts from the page',
items: { type: 'string' },
},
},

View File

@@ -1,39 +1,51 @@
import type { ToolResponse } from '@/tools/types'
export interface ParallelSearchParams {
objective: string
search_queries: string[]
processor?: string
search_queries?: string[] | string
mode?: string
max_results?: number
max_chars_per_result?: number
include_domains?: string
exclude_domains?: string
apiKey: string
}
export interface ParallelSearchResult {
url: string
title: string
url: string | null
title: string | null
publish_date?: string | null
excerpts: string[]
}
export interface ParallelSearchResponse {
results: ParallelSearchResult[]
export interface ParallelSearchResponse extends ToolResponse {
output: {
search_id: string | null
results: ParallelSearchResult[]
}
}
export interface ParallelExtractParams {
urls: string
objective: string
excerpts: boolean
full_content: boolean
objective?: string
excerpts?: boolean
full_content?: boolean
apiKey: string
}
export interface ParallelExtractResult {
url: string
title: string
content?: string
url: string | null
title?: string | null
publish_date?: string | null
excerpts?: string[]
full_content?: string | null
}
export interface ParallelExtractResponse {
results: ParallelExtractResult[]
export interface ParallelExtractResponse extends ToolResponse {
output: {
extract_id: string | null
results: ParallelExtractResult[]
}
}
export interface ParallelDeepResearchParams {
@@ -45,17 +57,22 @@ export interface ParallelDeepResearchParams {
}
export interface ParallelDeepResearchBasis {
url: string
title: string
excerpt: string
confidence?: number
field: string
reasoning: string
citations: {
url: string
title: string
excerpts: string[]
}[]
confidence: string
}
export interface ParallelDeepResearchResponse {
status: string
run_id: string
message?: string
content?: Record<string, unknown>
basis?: ParallelDeepResearchBasis[]
metadata?: Record<string, unknown>
export interface ParallelDeepResearchResponse extends ToolResponse {
output: {
status: string
run_id: string
message: string
content: Record<string, unknown>
basis: ParallelDeepResearchBasis[]
}
}

View File

@@ -25,7 +25,7 @@ export const genericWebhookTrigger: TriggerConfig = {
title: 'Require Authentication',
type: 'switch',
description: 'Require authentication for all webhook requests',
defaultValue: false,
defaultValue: true,
mode: 'trigger',
},
{
@@ -36,6 +36,7 @@ export const genericWebhookTrigger: TriggerConfig = {
description: 'Token used to authenticate webhook requests via Bearer token or custom header',
password: true,
required: false,
value: () => crypto.randomUUID(),
mode: 'trigger',
},
{