mirror of
https://github.com/simstudioai/sim.git
synced 2026-03-15 03:00:33 -04:00
Compare commits
27 Commits
feat/migra
...
v0.5.110
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c2c2c65d4 | ||
|
|
19ef526886 | ||
|
|
ff2a1527ab | ||
|
|
2e1c639a81 | ||
|
|
ecd3536a72 | ||
|
|
635179d696 | ||
|
|
f88926a6a8 | ||
|
|
690b47a0bf | ||
|
|
158d5236bc | ||
|
|
8c0a2e04b1 | ||
|
|
6586c5ce40 | ||
|
|
3ce947566d | ||
|
|
70c36cb7aa | ||
|
|
f1ec5fe824 | ||
|
|
e07e3c34cc | ||
|
|
0d2e6ff31d | ||
|
|
4fd0989264 | ||
|
|
67f8a687f6 | ||
|
|
af592349d3 | ||
|
|
0d86ea01f0 | ||
|
|
115f04e989 | ||
|
|
34d92fae89 | ||
|
|
67aa4bb332 | ||
|
|
15ace5e63f | ||
|
|
fdca73679d | ||
|
|
da46a387c9 | ||
|
|
b7e377ec4b |
@@ -44,20 +44,24 @@ Search the web using Parallel AI. Provides comprehensive search results with int
|
||||
| Parameter | Type | Required | Description |
|
||||
| --------- | ---- | -------- | ----------- |
|
||||
| `objective` | string | Yes | The search objective or question to answer |
|
||||
| `search_queries` | string | No | Optional comma-separated list of search queries to execute |
|
||||
| `processor` | string | No | Processing method: base or pro \(default: base\) |
|
||||
| `max_results` | number | No | Maximum number of results to return \(default: 5\) |
|
||||
| `max_chars_per_result` | number | No | Maximum characters per result \(default: 1500\) |
|
||||
| `search_queries` | string | No | Comma-separated list of search queries to execute |
|
||||
| `mode` | string | No | Search mode: one-shot, agentic, or fast \(default: one-shot\) |
|
||||
| `max_results` | number | No | Maximum number of results to return \(default: 10\) |
|
||||
| `max_chars_per_result` | number | No | Maximum characters per result excerpt \(minimum: 1000\) |
|
||||
| `include_domains` | string | No | Comma-separated list of domains to restrict search results to |
|
||||
| `exclude_domains` | string | No | Comma-separated list of domains to exclude from search results |
|
||||
| `apiKey` | string | Yes | Parallel AI API Key |
|
||||
|
||||
#### Output
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| --------- | ---- | ----------- |
|
||||
| `search_id` | string | Unique identifier for this search request |
|
||||
| `results` | array | Search results with excerpts from relevant pages |
|
||||
| ↳ `url` | string | The URL of the search result |
|
||||
| ↳ `title` | string | The title of the search result |
|
||||
| ↳ `excerpts` | array | Text excerpts from the page |
|
||||
| ↳ `publish_date` | string | Publication date of the page \(YYYY-MM-DD\) |
|
||||
| ↳ `excerpts` | array | LLM-optimized excerpts from the page |
|
||||
|
||||
### `parallel_extract`
|
||||
|
||||
@@ -68,31 +72,33 @@ Extract targeted information from specific URLs using Parallel AI. Processes pro
|
||||
| Parameter | Type | Required | Description |
|
||||
| --------- | ---- | -------- | ----------- |
|
||||
| `urls` | string | Yes | Comma-separated list of URLs to extract information from |
|
||||
| `objective` | string | Yes | What information to extract from the provided URLs |
|
||||
| `excerpts` | boolean | Yes | Include relevant excerpts from the content |
|
||||
| `full_content` | boolean | Yes | Include full page content |
|
||||
| `objective` | string | No | What information to extract from the provided URLs |
|
||||
| `excerpts` | boolean | No | Include relevant excerpts from the content \(default: true\) |
|
||||
| `full_content` | boolean | No | Include full page content as markdown \(default: false\) |
|
||||
| `apiKey` | string | Yes | Parallel AI API Key |
|
||||
|
||||
#### Output
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| --------- | ---- | ----------- |
|
||||
| `extract_id` | string | Unique identifier for this extraction request |
|
||||
| `results` | array | Extracted information from the provided URLs |
|
||||
| ↳ `url` | string | The source URL |
|
||||
| ↳ `title` | string | The title of the page |
|
||||
| ↳ `content` | string | Extracted content |
|
||||
| ↳ `excerpts` | array | Relevant text excerpts |
|
||||
| ↳ `publish_date` | string | Publication date \(YYYY-MM-DD\) |
|
||||
| ↳ `excerpts` | array | Relevant text excerpts in markdown |
|
||||
| ↳ `full_content` | string | Full page content as markdown |
|
||||
|
||||
### `parallel_deep_research`
|
||||
|
||||
Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 15 minutes to complete.
|
||||
Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 45 minutes to complete.
|
||||
|
||||
#### Input
|
||||
|
||||
| Parameter | Type | Required | Description |
|
||||
| --------- | ---- | -------- | ----------- |
|
||||
| `input` | string | Yes | Research query or question \(up to 15,000 characters\) |
|
||||
| `processor` | string | No | Compute level: base, lite, pro, ultra, ultra2x, ultra4x, ultra8x \(default: base\) |
|
||||
| `processor` | string | No | Processing tier: pro, ultra, pro-fast, ultra-fast \(default: pro\) |
|
||||
| `include_domains` | string | No | Comma-separated list of domains to restrict research to \(source policy\) |
|
||||
| `exclude_domains` | string | No | Comma-separated list of domains to exclude from research \(source policy\) |
|
||||
| `apiKey` | string | Yes | Parallel AI API Key |
|
||||
@@ -101,17 +107,17 @@ Conduct comprehensive deep research across the web using Parallel AI. Synthesize
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| --------- | ---- | ----------- |
|
||||
| `status` | string | Task status \(completed, failed\) |
|
||||
| `status` | string | Task status \(completed, failed, running\) |
|
||||
| `run_id` | string | Unique ID for this research task |
|
||||
| `message` | string | Status message |
|
||||
| `content` | object | Research results \(structured based on output_schema\) |
|
||||
| `basis` | array | Citations and sources with reasoning and confidence levels |
|
||||
| ↳ `field` | string | Output field name |
|
||||
| ↳ `field` | string | Output field dot-notation path |
|
||||
| ↳ `reasoning` | string | Explanation for the result |
|
||||
| ↳ `citations` | array | Array of sources |
|
||||
| ↳ `url` | string | Source URL |
|
||||
| ↳ `title` | string | Source title |
|
||||
| ↳ `excerpts` | array | Relevant excerpts from the source |
|
||||
| ↳ `confidence` | string | Confidence level indicator |
|
||||
| ↳ `confidence` | string | Confidence level \(high, medium\) |
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
|
||||
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import {
|
||||
@@ -631,11 +630,9 @@ async function handleMessageStream(
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
let messageStreamDecremented = false
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
incrementSSEConnections('a2a-message')
|
||||
const sendEvent = (event: string, data: unknown) => {
|
||||
try {
|
||||
const jsonRpcResponse = {
|
||||
@@ -845,19 +842,10 @@ async function handleMessageStream(
|
||||
})
|
||||
} finally {
|
||||
await releaseLock(lockKey, lockValue)
|
||||
if (!messageStreamDecremented) {
|
||||
messageStreamDecremented = true
|
||||
decrementSSEConnections('a2a-message')
|
||||
}
|
||||
controller.close()
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
if (!messageStreamDecremented) {
|
||||
messageStreamDecremented = true
|
||||
decrementSSEConnections('a2a-message')
|
||||
}
|
||||
},
|
||||
cancel() {},
|
||||
})
|
||||
|
||||
return new NextResponse(stream, {
|
||||
@@ -1042,22 +1030,16 @@ async function handleTaskResubscribe(
|
||||
{ once: true }
|
||||
)
|
||||
|
||||
let sseDecremented = false
|
||||
const cleanup = () => {
|
||||
isCancelled = true
|
||||
if (pollTimeoutId) {
|
||||
clearTimeout(pollTimeoutId)
|
||||
pollTimeoutId = null
|
||||
}
|
||||
if (!sseDecremented) {
|
||||
sseDecremented = true
|
||||
decrementSSEConnections('a2a-resubscribe')
|
||||
}
|
||||
}
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
incrementSSEConnections('a2a-resubscribe')
|
||||
const sendEvent = (event: string, data: unknown): boolean => {
|
||||
if (isCancelled || abortSignal.aborted) return false
|
||||
try {
|
||||
|
||||
@@ -14,7 +14,6 @@ import { getSession } from '@/lib/auth'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
|
||||
import { mcpPubSub } from '@/lib/mcp/pubsub'
|
||||
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('McpEventsSSE')
|
||||
@@ -50,14 +49,11 @@ export async function GET(request: NextRequest) {
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub()
|
||||
}
|
||||
decrementSSEConnections('mcp-events')
|
||||
logger.info(`SSE connection closed for workspace ${workspaceId}`)
|
||||
}
|
||||
|
||||
const stream = new ReadableStream({
|
||||
start(controller) {
|
||||
incrementSSEConnections('mcp-events')
|
||||
|
||||
const send = (eventName: string, data: Record<string, unknown>) => {
|
||||
if (cleaned) return
|
||||
try {
|
||||
|
||||
@@ -192,7 +192,8 @@ export const POST = withMcpAuth<{ id: string }>('read')(
|
||||
)
|
||||
} catch (error) {
|
||||
connectionStatus = 'error'
|
||||
lastError = error instanceof Error ? error.message : 'Connection test failed'
|
||||
lastError =
|
||||
error instanceof Error ? error.message.split('\n')[0].slice(0, 200) : 'Connection failed'
|
||||
logger.warn(`[${requestId}] Failed to connect to server ${serverId}:`, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,20 @@ interface TestConnectionResult {
|
||||
warnings?: string[]
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts a user-friendly error message from connection errors.
|
||||
* Keeps diagnostic info (timeout, DNS, HTTP status) but strips
|
||||
* verbose internals (Zod details, full response bodies, stack traces).
|
||||
*/
|
||||
function sanitizeConnectionError(error: unknown): string {
|
||||
if (!(error instanceof Error)) {
|
||||
return 'Unknown connection error'
|
||||
}
|
||||
|
||||
const firstLine = error.message.split('\n')[0]
|
||||
return firstLine.length > 200 ? `${firstLine.slice(0, 200)}...` : firstLine
|
||||
}
|
||||
|
||||
/**
|
||||
* POST - Test connection to an MCP server before registering it
|
||||
*/
|
||||
@@ -137,8 +151,7 @@ export const POST = withMcpAuth('write')(
|
||||
} catch (toolError) {
|
||||
logger.warn(`[${requestId}] Connection established but could not list tools:`, toolError)
|
||||
result.success = false
|
||||
const errorMessage = toolError instanceof Error ? toolError.message : 'Unknown error'
|
||||
result.error = `Connection established but could not list tools: ${errorMessage}`
|
||||
result.error = 'Connection established but could not list tools'
|
||||
result.warnings = result.warnings || []
|
||||
result.warnings.push(
|
||||
'Server connected but tool listing failed - connection may be incomplete'
|
||||
@@ -163,11 +176,7 @@ export const POST = withMcpAuth('write')(
|
||||
logger.warn(`[${requestId}] MCP server test failed:`, error)
|
||||
|
||||
result.success = false
|
||||
if (error instanceof Error) {
|
||||
result.error = error.message
|
||||
} else {
|
||||
result.error = 'Unknown connection error'
|
||||
}
|
||||
result.error = sanitizeConnectionError(error)
|
||||
} finally {
|
||||
if (client) {
|
||||
try {
|
||||
|
||||
@@ -89,11 +89,12 @@ export const POST = withMcpAuth('read')(
|
||||
tool = tools.find((t) => t.name === toolName) ?? null
|
||||
|
||||
if (!tool) {
|
||||
logger.warn(`[${requestId}] Tool ${toolName} not found on server ${serverId}`, {
|
||||
availableTools: tools.map((t) => t.name),
|
||||
})
|
||||
return createMcpErrorResponse(
|
||||
new Error(
|
||||
`Tool ${toolName} not found on server ${serverId}. Available tools: ${tools.map((t) => t.name).join(', ')}`
|
||||
),
|
||||
'Tool not found',
|
||||
new Error('Tool not found'),
|
||||
'Tool not found on the specified server',
|
||||
404
|
||||
)
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to cancel task',
|
||||
error: 'Failed to cancel task',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -86,7 +86,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to delete push notification',
|
||||
error: 'Failed to delete push notification',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -84,7 +84,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to fetch Agent Card',
|
||||
error: 'Failed to fetch Agent Card',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -107,7 +107,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to get push notification',
|
||||
error: 'Failed to get push notification',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -87,7 +87,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to get task',
|
||||
error: 'Failed to get task',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -111,7 +111,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to resubscribe',
|
||||
error: 'Failed to resubscribe',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -70,7 +70,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: `Failed to connect to agent: ${clientError instanceof Error ? clientError.message : 'Unknown error'}`,
|
||||
error: 'Failed to connect to agent',
|
||||
},
|
||||
{ status: 502 }
|
||||
)
|
||||
@@ -158,7 +158,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: `Failed to send message: ${sendError instanceof Error ? sendError.message : 'Unknown error'}`,
|
||||
error: 'Failed to send message to agent',
|
||||
},
|
||||
{ status: 502 }
|
||||
)
|
||||
@@ -218,7 +218,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Internal server error',
|
||||
error: 'Internal server error',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -98,7 +98,7 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Failed to set push notification',
|
||||
error: 'Failed to set push notification',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import { MongoClient } from 'mongodb'
|
||||
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
|
||||
import type { MongoDBCollectionInfo, MongoDBConnectionConfig } from '@/tools/mongodb/types'
|
||||
|
||||
export async function createMongoDBConnection(config: MongoDBConnectionConfig) {
|
||||
const hostValidation = await validateDatabaseHost(config.host, 'host')
|
||||
if (!hostValidation.isValid) {
|
||||
throw new Error(hostValidation.error)
|
||||
}
|
||||
|
||||
const credentials =
|
||||
config.username && config.password
|
||||
? `${encodeURIComponent(config.username)}:${encodeURIComponent(config.password)}@`
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import mysql from 'mysql2/promise'
|
||||
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
|
||||
|
||||
export interface MySQLConnectionConfig {
|
||||
host: string
|
||||
@@ -10,6 +11,11 @@ export interface MySQLConnectionConfig {
|
||||
}
|
||||
|
||||
export async function createMySQLConnection(config: MySQLConnectionConfig) {
|
||||
const hostValidation = await validateDatabaseHost(config.host, 'host')
|
||||
if (!hostValidation.isValid) {
|
||||
throw new Error(hostValidation.error)
|
||||
}
|
||||
|
||||
const connectionConfig: mysql.ConnectionOptions = {
|
||||
host: config.host,
|
||||
port: config.port,
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import neo4j from 'neo4j-driver'
|
||||
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
|
||||
import type { Neo4jConnectionConfig } from '@/tools/neo4j/types'
|
||||
|
||||
export async function createNeo4jDriver(config: Neo4jConnectionConfig) {
|
||||
const hostValidation = await validateDatabaseHost(config.host, 'host')
|
||||
if (!hostValidation.isValid) {
|
||||
throw new Error(hostValidation.error)
|
||||
}
|
||||
|
||||
const isAuraHost =
|
||||
config.host === 'databases.neo4j.io' || config.host.endsWith('.databases.neo4j.io')
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Deleting data from ${params.table} on ${params.host}:${params.port}/${params.database}`
|
||||
)
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -47,7 +47,7 @@ export async function POST(request: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -57,7 +57,7 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Inserting data into ${params.table} on ${params.host}:${params.port}/${params.database}`
|
||||
)
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -34,7 +34,7 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Introspecting PostgreSQL schema on ${params.host}:${params.port}/${params.database}`
|
||||
)
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -34,7 +34,7 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Executing PostgreSQL query on ${params.host}:${params.port}/${params.database}`
|
||||
)
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -54,7 +54,7 @@ export async function POST(request: NextRequest) {
|
||||
`[${requestId}] Updating data in ${params.table} on ${params.host}:${params.port}/${params.database}`
|
||||
)
|
||||
|
||||
const sql = createPostgresConnection({
|
||||
const sql = await createPostgresConnection({
|
||||
host: params.host,
|
||||
port: params.port,
|
||||
database: params.database,
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import postgres from 'postgres'
|
||||
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
|
||||
import type { PostgresConnectionConfig } from '@/tools/postgresql/types'
|
||||
|
||||
export function createPostgresConnection(config: PostgresConnectionConfig) {
|
||||
export async function createPostgresConnection(config: PostgresConnectionConfig) {
|
||||
const hostValidation = await validateDatabaseHost(config.host, 'host')
|
||||
if (!hostValidation.isValid) {
|
||||
throw new Error(hostValidation.error)
|
||||
}
|
||||
|
||||
const sslConfig =
|
||||
config.ssl === 'disabled'
|
||||
? false
|
||||
|
||||
@@ -3,6 +3,7 @@ import Redis from 'ioredis'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { validateDatabaseHost } from '@/lib/core/security/input-validation.server'
|
||||
|
||||
const logger = createLogger('RedisAPI')
|
||||
|
||||
@@ -24,6 +25,16 @@ export async function POST(request: NextRequest) {
|
||||
const body = await request.json()
|
||||
const { url, command, args } = RequestSchema.parse(body)
|
||||
|
||||
const parsedUrl = new URL(url)
|
||||
const hostname =
|
||||
parsedUrl.hostname.startsWith('[') && parsedUrl.hostname.endsWith(']')
|
||||
? parsedUrl.hostname.slice(1, -1)
|
||||
: parsedUrl.hostname
|
||||
const hostValidation = await validateDatabaseHost(hostname, 'host')
|
||||
if (!hostValidation.isValid) {
|
||||
return NextResponse.json({ error: hostValidation.error }, { status: 400 })
|
||||
}
|
||||
|
||||
client = new Redis(url, {
|
||||
connectTimeout: 10000,
|
||||
commandTimeout: 10000,
|
||||
|
||||
@@ -10,7 +10,6 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
|
||||
import { enrichTableSchema } from '@/lib/table/llm/wand'
|
||||
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
|
||||
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
|
||||
@@ -331,14 +330,10 @@ export async function POST(req: NextRequest) {
|
||||
const encoder = new TextEncoder()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
let wandStreamClosed = false
|
||||
const readable = new ReadableStream({
|
||||
async start(controller) {
|
||||
incrementSSEConnections('wand')
|
||||
const reader = response.body?.getReader()
|
||||
if (!reader) {
|
||||
wandStreamClosed = true
|
||||
decrementSSEConnections('wand')
|
||||
controller.close()
|
||||
return
|
||||
}
|
||||
@@ -483,18 +478,9 @@ export async function POST(req: NextRequest) {
|
||||
controller.close()
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
if (!wandStreamClosed) {
|
||||
wandStreamClosed = true
|
||||
decrementSSEConnections('wand')
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
if (!wandStreamClosed) {
|
||||
wandStreamClosed = true
|
||||
decrementSSEConnections('wand')
|
||||
}
|
||||
},
|
||||
cancel() {},
|
||||
})
|
||||
|
||||
return new Response(readable, {
|
||||
|
||||
@@ -324,7 +324,9 @@ vi.mock('@/lib/webhooks/processor', () => ({
|
||||
return null
|
||||
}
|
||||
),
|
||||
checkWebhookPreprocessing: vi.fn().mockResolvedValue(null),
|
||||
checkWebhookPreprocessing: vi
|
||||
.fn()
|
||||
.mockResolvedValue({ error: null, actorUserId: 'test-user-id' }),
|
||||
formatProviderErrorResponse: vi.fn().mockImplementation((_webhook, error, status) => {
|
||||
const { NextResponse } = require('next/server')
|
||||
return NextResponse.json({ error }, { status })
|
||||
|
||||
@@ -4,7 +4,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import {
|
||||
checkWebhookPreprocessing,
|
||||
findAllWebhooksForPath,
|
||||
formatProviderErrorResponse,
|
||||
handlePreDeploymentVerification,
|
||||
handleProviderChallenges,
|
||||
handleProviderReachabilityTest,
|
||||
@@ -82,7 +81,6 @@ export async function POST(
|
||||
requestId
|
||||
)
|
||||
if (authError) {
|
||||
// For multi-webhook, log and continue to next webhook
|
||||
if (webhooksForPath.length > 1) {
|
||||
logger.warn(`[${requestId}] Auth failed for webhook ${foundWebhook.id}, continuing to next`)
|
||||
continue
|
||||
@@ -92,39 +90,18 @@ export async function POST(
|
||||
|
||||
const reachabilityResponse = handleProviderReachabilityTest(foundWebhook, body, requestId)
|
||||
if (reachabilityResponse) {
|
||||
// Reachability test should return immediately for the first webhook
|
||||
return reachabilityResponse
|
||||
}
|
||||
|
||||
let preprocessError: NextResponse | null = null
|
||||
try {
|
||||
preprocessError = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId)
|
||||
if (preprocessError) {
|
||||
if (webhooksForPath.length > 1) {
|
||||
logger.warn(
|
||||
`[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next`
|
||||
)
|
||||
continue
|
||||
}
|
||||
return preprocessError
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Unexpected error during webhook preprocessing`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
webhookId: foundWebhook.id,
|
||||
workflowId: foundWorkflow.id,
|
||||
})
|
||||
|
||||
const preprocessResult = await checkWebhookPreprocessing(foundWorkflow, foundWebhook, requestId)
|
||||
if (preprocessResult.error) {
|
||||
if (webhooksForPath.length > 1) {
|
||||
logger.warn(
|
||||
`[${requestId}] Preprocessing failed for webhook ${foundWebhook.id}, continuing to next`
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
return formatProviderErrorResponse(
|
||||
foundWebhook,
|
||||
'An unexpected error occurred during preprocessing',
|
||||
500
|
||||
)
|
||||
return preprocessResult.error
|
||||
}
|
||||
|
||||
if (foundWebhook.blockId) {
|
||||
@@ -152,6 +129,7 @@ export async function POST(
|
||||
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
|
||||
requestId,
|
||||
path,
|
||||
actorUserId: preprocessResult.actorUserId,
|
||||
})
|
||||
responses.push(response)
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
|
||||
import { processInputFileFields } from '@/lib/execution/files'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
|
||||
import {
|
||||
cleanupExecutionBase64Cache,
|
||||
hydrateUserFilesWithBase64,
|
||||
@@ -764,7 +763,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const encoder = new TextEncoder()
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
|
||||
let isStreamClosed = false
|
||||
let sseDecremented = false
|
||||
|
||||
const eventWriter = createExecutionEventWriter(executionId)
|
||||
setExecutionMeta(executionId, {
|
||||
@@ -775,7 +773,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
incrementSSEConnections('workflow-execute')
|
||||
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
|
||||
|
||||
const sendEvent = (event: ExecutionEvent) => {
|
||||
@@ -1159,10 +1156,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
if (executionId) {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
}
|
||||
if (!sseDecremented) {
|
||||
sseDecremented = true
|
||||
decrementSSEConnections('workflow-execute')
|
||||
}
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
||||
@@ -1174,10 +1167,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
logger.info(`[${requestId}] Client disconnected from SSE stream`)
|
||||
if (!sseDecremented) {
|
||||
sseDecremented = true
|
||||
decrementSSEConnections('workflow-execute')
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import {
|
||||
getExecutionMeta,
|
||||
readExecutionEvents,
|
||||
} from '@/lib/execution/event-buffer'
|
||||
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
|
||||
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
|
||||
@@ -74,10 +73,8 @@ export async function GET(
|
||||
|
||||
let closed = false
|
||||
|
||||
let sseDecremented = false
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
incrementSSEConnections('execution-stream-reconnect')
|
||||
let lastEventId = fromEventId
|
||||
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
|
||||
|
||||
@@ -145,20 +142,11 @@ export async function GET(
|
||||
controller.close()
|
||||
} catch {}
|
||||
}
|
||||
} finally {
|
||||
if (!sseDecremented) {
|
||||
sseDecremented = true
|
||||
decrementSSEConnections('execution-stream-reconnect')
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
closed = true
|
||||
logger.info('Client disconnected from reconnection stream', { executionId })
|
||||
if (!sseDecremented) {
|
||||
sseDecremented = true
|
||||
decrementSSEConnections('execution-stream-reconnect')
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
} from '@/components/emails'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { decryptSecret } from '@/lib/core/security/encryption'
|
||||
import { secureFetchWithValidation } from '@/lib/core/security/input-validation.server'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { sendEmail } from '@/lib/messaging/email/mailer'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
@@ -135,18 +136,18 @@ async function testWebhook(subscription: typeof workspaceNotificationSubscriptio
|
||||
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
|
||||
}
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), 10000)
|
||||
|
||||
try {
|
||||
const response = await fetch(webhookConfig.url, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body,
|
||||
signal: controller.signal,
|
||||
})
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
const response = await secureFetchWithValidation(
|
||||
webhookConfig.url,
|
||||
{
|
||||
method: 'POST',
|
||||
headers,
|
||||
body,
|
||||
timeout: 10000,
|
||||
allowHttp: true,
|
||||
},
|
||||
'webhookUrl'
|
||||
)
|
||||
const responseBody = await response.text().catch(() => '')
|
||||
|
||||
return {
|
||||
@@ -157,12 +158,10 @@ async function testWebhook(subscription: typeof workspaceNotificationSubscriptio
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
clearTimeout(timeoutId)
|
||||
const err = error as Error & { name?: string }
|
||||
if (err.name === 'AbortError') {
|
||||
return { success: false, error: 'Request timeout after 10 seconds' }
|
||||
}
|
||||
return { success: false, error: err.message }
|
||||
logger.warn('Webhook test failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return { success: false, error: 'Failed to deliver webhook' }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,13 +267,15 @@ async function testSlack(
|
||||
|
||||
return {
|
||||
success: result.ok,
|
||||
error: result.error,
|
||||
error: result.ok ? undefined : `Slack error: ${result.error || 'unknown'}`,
|
||||
channel: result.channel,
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const err = error as Error
|
||||
return { success: false, error: err.message }
|
||||
logger.warn('Slack test notification failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return { success: false, error: 'Failed to send Slack notification' }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
import { db } from '@sim/db'
|
||||
import { webhook, workflow as workflowTable } from '@sim/db/schema'
|
||||
import { account, webhook } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getExecutionTimeout,
|
||||
getTimeoutErrorMessage,
|
||||
} from '@/lib/core/execution-limits'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { processExecutionFiles } from '@/lib/execution/files'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
|
||||
@@ -20,7 +15,7 @@ import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webho
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
|
||||
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata } from '@/executor/execution/types'
|
||||
@@ -109,8 +104,8 @@ export type WebhookExecutionPayload = {
|
||||
headers: Record<string, string>
|
||||
path: string
|
||||
blockId?: string
|
||||
workspaceId?: string
|
||||
credentialId?: string
|
||||
credentialAccountUserId?: string
|
||||
}
|
||||
|
||||
export async function executeWebhookJob(payload: WebhookExecutionPayload) {
|
||||
@@ -143,6 +138,22 @@ export async function executeWebhookJob(payload: WebhookExecutionPayload) {
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the account userId for a credential
|
||||
*/
|
||||
async function resolveCredentialAccountUserId(credentialId: string): Promise<string | undefined> {
|
||||
const resolved = await resolveOAuthAccountId(credentialId)
|
||||
if (!resolved) {
|
||||
return undefined
|
||||
}
|
||||
const [credentialRecord] = await db
|
||||
.select({ userId: account.userId })
|
||||
.from(account)
|
||||
.where(eq(account.id, resolved.accountId))
|
||||
.limit(1)
|
||||
return credentialRecord?.userId
|
||||
}
|
||||
|
||||
async function executeWebhookJobInternal(
|
||||
payload: WebhookExecutionPayload,
|
||||
executionId: string,
|
||||
@@ -155,17 +166,56 @@ async function executeWebhookJobInternal(
|
||||
requestId
|
||||
)
|
||||
|
||||
const userSubscription = await getHighestPrioritySubscription(payload.userId)
|
||||
const asyncTimeout = getExecutionTimeout(
|
||||
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||
'async'
|
||||
)
|
||||
// Resolve workflow record, billing actor, subscription, and timeout
|
||||
const preprocessResult = await preprocessExecution({
|
||||
workflowId: payload.workflowId,
|
||||
userId: payload.userId,
|
||||
triggerType: 'webhook',
|
||||
executionId,
|
||||
requestId,
|
||||
checkRateLimit: false,
|
||||
checkDeployment: false,
|
||||
skipUsageLimits: true,
|
||||
workspaceId: payload.workspaceId,
|
||||
loggingSession,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
throw new Error(preprocessResult.error?.message || 'Preprocessing failed in background job')
|
||||
}
|
||||
|
||||
const { workflowRecord, executionTimeout } = preprocessResult
|
||||
if (!workflowRecord) {
|
||||
throw new Error(`Workflow ${payload.workflowId} not found during preprocessing`)
|
||||
}
|
||||
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
if (!workspaceId) {
|
||||
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
|
||||
}
|
||||
|
||||
const workflowVariables = (workflowRecord.variables as Record<string, any>) || {}
|
||||
const asyncTimeout = executionTimeout?.async ?? 120_000
|
||||
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||
|
||||
let deploymentVersionId: string | undefined
|
||||
|
||||
try {
|
||||
const workflowData = await loadDeployedWorkflowState(payload.workflowId)
|
||||
// Parallelize workflow state, webhook record, and credential resolution
|
||||
const [workflowData, webhookRows, resolvedCredentialUserId] = await Promise.all([
|
||||
loadDeployedWorkflowState(payload.workflowId, workspaceId),
|
||||
db.select().from(webhook).where(eq(webhook.id, payload.webhookId)).limit(1),
|
||||
payload.credentialId
|
||||
? resolveCredentialAccountUserId(payload.credentialId)
|
||||
: Promise.resolve(undefined),
|
||||
])
|
||||
const credentialAccountUserId = resolvedCredentialUserId
|
||||
if (payload.credentialId && !credentialAccountUserId) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to resolve credential account for credential ${payload.credentialId}`
|
||||
)
|
||||
}
|
||||
|
||||
if (!workflowData) {
|
||||
throw new Error(
|
||||
'Workflow state not found. The workflow may not be deployed or the deployment data may be corrupted.'
|
||||
@@ -178,28 +228,11 @@ async function executeWebhookJobInternal(
|
||||
? (workflowData.deploymentVersionId as string)
|
||||
: undefined
|
||||
|
||||
const wfRows = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId, variables: workflowTable.variables })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, payload.workflowId))
|
||||
.limit(1)
|
||||
const workspaceId = wfRows[0]?.workspaceId
|
||||
if (!workspaceId) {
|
||||
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
|
||||
}
|
||||
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
|
||||
|
||||
// Handle special Airtable case
|
||||
if (payload.provider === 'airtable') {
|
||||
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
|
||||
|
||||
// Load the actual webhook record from database to get providerConfig
|
||||
const [webhookRecord] = await db
|
||||
.select()
|
||||
.from(webhook)
|
||||
.where(eq(webhook.id, payload.webhookId))
|
||||
.limit(1)
|
||||
|
||||
const webhookRecord = webhookRows[0]
|
||||
if (!webhookRecord) {
|
||||
throw new Error(`Webhook record not found: ${payload.webhookId}`)
|
||||
}
|
||||
@@ -210,29 +243,20 @@ async function executeWebhookJobInternal(
|
||||
providerConfig: webhookRecord.providerConfig,
|
||||
}
|
||||
|
||||
// Create a mock workflow object for Airtable processing
|
||||
const mockWorkflow = {
|
||||
id: payload.workflowId,
|
||||
userId: payload.userId,
|
||||
}
|
||||
|
||||
// Get the processed Airtable input
|
||||
const airtableInput = await fetchAndProcessAirtablePayloads(
|
||||
webhookData,
|
||||
mockWorkflow,
|
||||
requestId
|
||||
)
|
||||
|
||||
// If we got input (changes), execute the workflow like other providers
|
||||
if (airtableInput) {
|
||||
logger.info(`[${requestId}] Executing workflow with Airtable changes`)
|
||||
|
||||
// Get workflow for core execution
|
||||
const workflow = await getWorkflowById(payload.workflowId)
|
||||
if (!workflow) {
|
||||
throw new Error(`Workflow ${payload.workflowId} not found`)
|
||||
}
|
||||
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
executionId,
|
||||
@@ -240,13 +264,13 @@ async function executeWebhookJobInternal(
|
||||
workspaceId,
|
||||
userId: payload.userId,
|
||||
sessionUserId: undefined,
|
||||
workflowUserId: workflow.userId,
|
||||
workflowUserId: workflowRecord.userId,
|
||||
triggerType: payload.provider || 'webhook',
|
||||
triggerBlockId: payload.blockId,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
isClientSession: false,
|
||||
credentialAccountUserId: payload.credentialAccountUserId,
|
||||
credentialAccountUserId,
|
||||
workflowStateOverride: {
|
||||
blocks,
|
||||
edges,
|
||||
@@ -258,7 +282,7 @@ async function executeWebhookJobInternal(
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
metadata,
|
||||
workflow,
|
||||
workflowRecord,
|
||||
airtableInput,
|
||||
workflowVariables,
|
||||
[]
|
||||
@@ -329,7 +353,6 @@ async function executeWebhookJobInternal(
|
||||
// No changes to process
|
||||
logger.info(`[${requestId}] No Airtable changes to process`)
|
||||
|
||||
// Start logging session so the complete call has a log entry to update
|
||||
await loggingSession.safeStart({
|
||||
userId: payload.userId,
|
||||
workspaceId,
|
||||
@@ -357,13 +380,6 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
|
||||
// Format input for standard webhooks
|
||||
// Load the actual webhook to get providerConfig (needed for Teams credentialId)
|
||||
const webhookRows = await db
|
||||
.select()
|
||||
.from(webhook)
|
||||
.where(eq(webhook.id, payload.webhookId))
|
||||
.limit(1)
|
||||
|
||||
const actualWebhook =
|
||||
webhookRows.length > 0
|
||||
? webhookRows[0]
|
||||
@@ -386,7 +402,6 @@ async function executeWebhookJobInternal(
|
||||
if (!input && payload.provider === 'whatsapp') {
|
||||
logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`)
|
||||
|
||||
// Start logging session so the complete call has a log entry to update
|
||||
await loggingSession.safeStart({
|
||||
userId: payload.userId,
|
||||
workspaceId,
|
||||
@@ -452,7 +467,6 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
|
||||
// Continue without processing attachments rather than failing execution
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,18 +513,11 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error processing generic webhook files:`, error)
|
||||
// Continue without processing files rather than failing execution
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`)
|
||||
|
||||
// Get workflow for core execution
|
||||
const workflow = await getWorkflowById(payload.workflowId)
|
||||
if (!workflow) {
|
||||
throw new Error(`Workflow ${payload.workflowId} not found`)
|
||||
}
|
||||
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
executionId,
|
||||
@@ -518,13 +525,13 @@ async function executeWebhookJobInternal(
|
||||
workspaceId,
|
||||
userId: payload.userId,
|
||||
sessionUserId: undefined,
|
||||
workflowUserId: workflow.userId,
|
||||
workflowUserId: workflowRecord.userId,
|
||||
triggerType: payload.provider || 'webhook',
|
||||
triggerBlockId: payload.blockId,
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
isClientSession: false,
|
||||
credentialAccountUserId: payload.credentialAccountUserId,
|
||||
credentialAccountUserId,
|
||||
workflowStateOverride: {
|
||||
blocks,
|
||||
edges,
|
||||
@@ -536,7 +543,13 @@ async function executeWebhookJobInternal(
|
||||
|
||||
const triggerInput = input || {}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(metadata, workflow, triggerInput, workflowVariables, [])
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
metadata,
|
||||
workflowRecord,
|
||||
triggerInput,
|
||||
workflowVariables,
|
||||
[]
|
||||
)
|
||||
|
||||
const executionResult = await executeWorkflowCore({
|
||||
snapshot,
|
||||
@@ -611,23 +624,9 @@ async function executeWebhookJobInternal(
|
||||
})
|
||||
|
||||
try {
|
||||
const wfRow = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, payload.workflowId))
|
||||
.limit(1)
|
||||
const errorWorkspaceId = wfRow[0]?.workspaceId
|
||||
|
||||
if (!errorWorkspaceId) {
|
||||
logger.warn(
|
||||
`[${requestId}] Cannot log error: workflow ${payload.workflowId} has no workspace`
|
||||
)
|
||||
throw error
|
||||
}
|
||||
|
||||
await loggingSession.safeStart({
|
||||
userId: payload.userId,
|
||||
workspaceId: errorWorkspaceId,
|
||||
workspaceId,
|
||||
variables: {},
|
||||
triggerData: {
|
||||
isTest: false,
|
||||
|
||||
@@ -19,6 +19,7 @@ import { checkUsageStatus } from '@/lib/billing/calculations/usage-monitor'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter'
|
||||
import { decryptSecret } from '@/lib/core/security/encryption'
|
||||
import { secureFetchWithValidation } from '@/lib/core/security/input-validation.server'
|
||||
import { formatDuration } from '@/lib/core/utils/formatting'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
|
||||
@@ -207,18 +208,18 @@ async function deliverWebhook(
|
||||
headers['sim-signature'] = `t=${payload.timestamp},v1=${signature}`
|
||||
}
|
||||
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), 30000)
|
||||
|
||||
try {
|
||||
const response = await fetch(webhookConfig.url, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body,
|
||||
signal: controller.signal,
|
||||
})
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
const response = await secureFetchWithValidation(
|
||||
webhookConfig.url,
|
||||
{
|
||||
method: 'POST',
|
||||
headers,
|
||||
body,
|
||||
timeout: 30000,
|
||||
allowHttp: true,
|
||||
},
|
||||
'webhookUrl'
|
||||
)
|
||||
|
||||
return {
|
||||
success: response.ok,
|
||||
@@ -226,11 +227,13 @@ async function deliverWebhook(
|
||||
error: response.ok ? undefined : `HTTP ${response.status}`,
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
clearTimeout(timeoutId)
|
||||
const err = error as Error & { name?: string }
|
||||
logger.warn('Webhook delivery failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
webhookUrl: webhookConfig.url,
|
||||
})
|
||||
return {
|
||||
success: false,
|
||||
error: err.name === 'AbortError' ? 'Request timeout' : err.message,
|
||||
error: 'Failed to deliver webhook',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
authMode: AuthMode.ApiKey,
|
||||
longDescription:
|
||||
'Integrate Parallel AI into the workflow. Can search the web, extract information from URLs, and conduct deep research.',
|
||||
docsLink: 'https://docs.parallel.ai/',
|
||||
docsLink: 'https://docs.sim.ai/tools/parallel-ai',
|
||||
category: 'tools',
|
||||
bgColor: '#E0E0E0',
|
||||
icon: ParallelIcon,
|
||||
@@ -56,7 +56,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
title: 'Extract Objective',
|
||||
type: 'long-input',
|
||||
placeholder: 'What information to extract from the URLs?',
|
||||
required: true,
|
||||
required: false,
|
||||
condition: { field: 'operation', value: 'extract' },
|
||||
},
|
||||
{
|
||||
@@ -89,6 +89,37 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
required: true,
|
||||
condition: { field: 'operation', value: 'deep_research' },
|
||||
},
|
||||
{
|
||||
id: 'search_mode',
|
||||
title: 'Search Mode',
|
||||
type: 'dropdown',
|
||||
options: [
|
||||
{ label: 'One-Shot', id: 'one-shot' },
|
||||
{ label: 'Agentic', id: 'agentic' },
|
||||
{ label: 'Fast', id: 'fast' },
|
||||
],
|
||||
value: () => 'one-shot',
|
||||
condition: { field: 'operation', value: 'search' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'search_include_domains',
|
||||
title: 'Include Domains',
|
||||
type: 'short-input',
|
||||
placeholder: 'Comma-separated domains to include (e.g., .edu, example.com)',
|
||||
required: false,
|
||||
condition: { field: 'operation', value: 'search' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'search_exclude_domains',
|
||||
title: 'Exclude Domains',
|
||||
type: 'short-input',
|
||||
placeholder: 'Comma-separated domains to exclude',
|
||||
required: false,
|
||||
condition: { field: 'operation', value: 'search' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'include_domains',
|
||||
title: 'Include Domains',
|
||||
@@ -96,6 +127,7 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
placeholder: 'Comma-separated domains to include',
|
||||
required: false,
|
||||
condition: { field: 'operation', value: 'deep_research' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'exclude_domains',
|
||||
@@ -104,37 +136,37 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
placeholder: 'Comma-separated domains to exclude',
|
||||
required: false,
|
||||
condition: { field: 'operation', value: 'deep_research' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'processor',
|
||||
title: 'Processor',
|
||||
title: 'Research Processor',
|
||||
type: 'dropdown',
|
||||
options: [
|
||||
{ label: 'Lite', id: 'lite' },
|
||||
{ label: 'Base', id: 'base' },
|
||||
{ label: 'Core', id: 'core' },
|
||||
{ label: 'Core 2x', id: 'core2x' },
|
||||
{ label: 'Pro', id: 'pro' },
|
||||
{ label: 'Ultra', id: 'ultra' },
|
||||
{ label: 'Ultra 2x', id: 'ultra2x' },
|
||||
{ label: 'Ultra 4x', id: 'ultra4x' },
|
||||
{ label: 'Pro Fast', id: 'pro-fast' },
|
||||
{ label: 'Ultra Fast', id: 'ultra-fast' },
|
||||
],
|
||||
value: () => 'base',
|
||||
condition: { field: 'operation', value: ['search', 'deep_research'] },
|
||||
value: () => 'pro',
|
||||
condition: { field: 'operation', value: 'deep_research' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'max_results',
|
||||
title: 'Max Results',
|
||||
type: 'short-input',
|
||||
placeholder: '5',
|
||||
placeholder: '10',
|
||||
condition: { field: 'operation', value: 'search' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'max_chars_per_result',
|
||||
title: 'Max Chars',
|
||||
title: 'Max Chars Per Result',
|
||||
type: 'short-input',
|
||||
placeholder: '1500',
|
||||
condition: { field: 'operation', value: 'search' },
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'apiKey',
|
||||
@@ -149,8 +181,6 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
access: ['parallel_search', 'parallel_extract', 'parallel_deep_research'],
|
||||
config: {
|
||||
tool: (params) => {
|
||||
if (params.extract_objective) params.objective = params.extract_objective
|
||||
if (params.research_input) params.input = params.research_input
|
||||
switch (params.operation) {
|
||||
case 'search':
|
||||
return 'parallel_search'
|
||||
@@ -174,21 +204,30 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
.filter((query: string) => query.length > 0)
|
||||
if (queries.length > 0) {
|
||||
result.search_queries = queries
|
||||
} else {
|
||||
result.search_queries = undefined
|
||||
}
|
||||
}
|
||||
if (params.search_mode && params.search_mode !== 'one-shot') {
|
||||
result.mode = params.search_mode
|
||||
}
|
||||
if (params.max_results) result.max_results = Number(params.max_results)
|
||||
if (params.max_chars_per_result) {
|
||||
result.max_chars_per_result = Number(params.max_chars_per_result)
|
||||
}
|
||||
result.include_domains = params.search_include_domains || undefined
|
||||
result.exclude_domains = params.search_exclude_domains || undefined
|
||||
}
|
||||
|
||||
if (operation === 'extract') {
|
||||
if (params.extract_objective) result.objective = params.extract_objective
|
||||
result.excerpts = !(params.excerpts === 'false' || params.excerpts === false)
|
||||
result.full_content = params.full_content === 'true' || params.full_content === true
|
||||
}
|
||||
|
||||
if (operation === 'deep_research') {
|
||||
if (params.research_input) result.input = params.research_input
|
||||
if (params.processor) result.processor = params.processor
|
||||
}
|
||||
|
||||
return result
|
||||
},
|
||||
},
|
||||
@@ -202,29 +241,34 @@ export const ParallelBlock: BlockConfig<ToolResponse> = {
|
||||
excerpts: { type: 'boolean', description: 'Include excerpts' },
|
||||
full_content: { type: 'boolean', description: 'Include full content' },
|
||||
research_input: { type: 'string', description: 'Deep research query' },
|
||||
include_domains: { type: 'string', description: 'Domains to include' },
|
||||
exclude_domains: { type: 'string', description: 'Domains to exclude' },
|
||||
processor: { type: 'string', description: 'Processing method' },
|
||||
include_domains: { type: 'string', description: 'Domains to include (deep research)' },
|
||||
exclude_domains: { type: 'string', description: 'Domains to exclude (deep research)' },
|
||||
search_include_domains: { type: 'string', description: 'Domains to include (search)' },
|
||||
search_exclude_domains: { type: 'string', description: 'Domains to exclude (search)' },
|
||||
search_mode: { type: 'string', description: 'Search mode (one-shot, agentic, fast)' },
|
||||
processor: { type: 'string', description: 'Research processing tier' },
|
||||
max_results: { type: 'number', description: 'Maximum number of results' },
|
||||
max_chars_per_result: { type: 'number', description: 'Maximum characters per result' },
|
||||
apiKey: { type: 'string', description: 'Parallel AI API key' },
|
||||
},
|
||||
outputs: {
|
||||
results: { type: 'string', description: 'Search or extract results (JSON stringified)' },
|
||||
results: {
|
||||
type: 'json',
|
||||
description: 'Search or extract results (array of url, title, excerpts)',
|
||||
},
|
||||
search_id: { type: 'string', description: 'Search request ID (for search)' },
|
||||
extract_id: { type: 'string', description: 'Extract request ID (for extract)' },
|
||||
status: { type: 'string', description: 'Task status (for deep research)' },
|
||||
run_id: { type: 'string', description: 'Task run ID (for deep research)' },
|
||||
message: { type: 'string', description: 'Status message (for deep research)' },
|
||||
content: {
|
||||
type: 'string',
|
||||
description: 'Research content (for deep research, JSON stringified)',
|
||||
type: 'json',
|
||||
description: 'Research content (for deep research, structured based on output_schema)',
|
||||
},
|
||||
basis: {
|
||||
type: 'string',
|
||||
description: 'Citations and sources (for deep research, JSON stringified)',
|
||||
},
|
||||
metadata: {
|
||||
type: 'string',
|
||||
description: 'Task metadata (for deep research, JSON stringified)',
|
||||
type: 'json',
|
||||
description:
|
||||
'Citations and sources with field, reasoning, citations, confidence (for deep research)',
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
ClientFactoryOptions,
|
||||
} from '@a2a-js/sdk/client'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
|
||||
import { isInternalFileUrl } from '@/lib/uploads/utils/file-utils'
|
||||
import { A2A_TERMINAL_STATES } from './constants'
|
||||
|
||||
@@ -43,6 +44,11 @@ class ApiKeyInterceptor implements CallInterceptor {
|
||||
* Tries standard path first, falls back to root URL for compatibility.
|
||||
*/
|
||||
export async function createA2AClient(agentUrl: string, apiKey?: string): Promise<Client> {
|
||||
const validation = await validateUrlWithDNS(agentUrl, 'agentUrl')
|
||||
if (!validation.isValid) {
|
||||
throw new Error(validation.error || 'Agent URL validation failed')
|
||||
}
|
||||
|
||||
const factoryOptions = apiKey
|
||||
? ClientFactoryOptions.createFrom(ClientFactoryOptions.default, {
|
||||
clientConfig: {
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
isLegacyApiKeyFormat,
|
||||
} from '@/lib/api-key/crypto'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { safeCompare } from '@/lib/core/security/encryption'
|
||||
|
||||
const logger = createLogger('ApiKeyAuth')
|
||||
|
||||
@@ -39,7 +40,7 @@ export async function authenticateApiKey(inputKey: string, storedKey: string): P
|
||||
if (isEncryptedKey(storedKey)) {
|
||||
try {
|
||||
const { decrypted } = await decryptApiKey(storedKey)
|
||||
return inputKey === decrypted
|
||||
return safeCompare(inputKey, decrypted)
|
||||
} catch (decryptError) {
|
||||
logger.error('Failed to decrypt stored API key:', { error: decryptError })
|
||||
return false
|
||||
@@ -54,27 +55,27 @@ export async function authenticateApiKey(inputKey: string, storedKey: string): P
|
||||
if (isEncryptedKey(storedKey)) {
|
||||
try {
|
||||
const { decrypted } = await decryptApiKey(storedKey)
|
||||
return inputKey === decrypted
|
||||
return safeCompare(inputKey, decrypted)
|
||||
} catch (decryptError) {
|
||||
logger.error('Failed to decrypt stored API key:', { error: decryptError })
|
||||
// Fall through to plain text comparison if decryption fails
|
||||
}
|
||||
}
|
||||
// Legacy format can match against plain text storage
|
||||
return inputKey === storedKey
|
||||
return safeCompare(inputKey, storedKey)
|
||||
}
|
||||
|
||||
// If no recognized prefix, fall back to original behavior
|
||||
if (isEncryptedKey(storedKey)) {
|
||||
try {
|
||||
const { decrypted } = await decryptApiKey(storedKey)
|
||||
return inputKey === decrypted
|
||||
return safeCompare(inputKey, decrypted)
|
||||
} catch (decryptError) {
|
||||
logger.error('Failed to decrypt stored API key:', { error: decryptError })
|
||||
}
|
||||
}
|
||||
|
||||
return inputKey === storedKey
|
||||
return safeCompare(inputKey, storedKey)
|
||||
} catch (error) {
|
||||
logger.error('API key authentication error:', { error })
|
||||
return false
|
||||
|
||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { jwtVerify, SignJWT } from 'jose'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { safeCompare } from '@/lib/core/security/encryption'
|
||||
|
||||
const logger = createLogger('CronAuth')
|
||||
|
||||
@@ -81,7 +82,8 @@ export function verifyCronAuth(request: NextRequest, context?: string): NextResp
|
||||
|
||||
const authHeader = request.headers.get('authorization')
|
||||
const expectedAuth = `Bearer ${env.CRON_SECRET}`
|
||||
if (authHeader !== expectedAuth) {
|
||||
const isValid = authHeader !== null && safeCompare(authHeader, expectedAuth)
|
||||
if (!isValid) {
|
||||
const contextInfo = context ? ` for ${context}` : ''
|
||||
logger.warn(`Unauthorized CRON access attempt${contextInfo}`, {
|
||||
providedAuth: authHeader,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { safeCompare } from '@/lib/core/security/encryption'
|
||||
|
||||
export function checkInternalApiKey(req: NextRequest) {
|
||||
const apiKey = req.headers.get('x-api-key')
|
||||
@@ -13,7 +14,7 @@ export function checkInternalApiKey(req: NextRequest) {
|
||||
return { success: false, error: 'API key required' }
|
||||
}
|
||||
|
||||
if (apiKey !== expectedApiKey) {
|
||||
if (!safeCompare(apiKey, expectedApiKey)) {
|
||||
return { success: false, error: 'Invalid API key' }
|
||||
}
|
||||
|
||||
|
||||
@@ -81,7 +81,9 @@ export function setDeploymentAuthCookie(
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds CORS headers to allow cross-origin requests for embedded deployments
|
||||
* Adds CORS headers to allow cross-origin requests for embedded deployments.
|
||||
* Embedded chat widgets and forms are designed to run on any customer domain,
|
||||
* so we reflect the requesting origin rather than restricting to an allowlist.
|
||||
*/
|
||||
export function addCorsHeaders(response: NextResponse, request: NextRequest): NextResponse {
|
||||
const origin = request.headers.get('origin') || ''
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { createCipheriv, createDecipheriv, randomBytes, timingSafeEqual } from 'crypto'
|
||||
import { createCipheriv, createDecipheriv, createHmac, randomBytes, timingSafeEqual } from 'crypto'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
@@ -91,8 +91,8 @@ export function generatePassword(length = 24): string {
|
||||
* @returns True if strings are equal, false otherwise
|
||||
*/
|
||||
export function safeCompare(a: string, b: string): boolean {
|
||||
if (a.length !== b.length) {
|
||||
return false
|
||||
}
|
||||
return timingSafeEqual(Buffer.from(a), Buffer.from(b))
|
||||
const key = 'safeCompare'
|
||||
const ha = createHmac('sha256', key).update(a).digest()
|
||||
const hb = createHmac('sha256', key).update(b).digest()
|
||||
return timingSafeEqual(ha, hb)
|
||||
}
|
||||
|
||||
@@ -54,9 +54,10 @@ function isPrivateOrReservedIP(ip: string): boolean {
|
||||
*/
|
||||
export async function validateUrlWithDNS(
|
||||
url: string | null | undefined,
|
||||
paramName = 'url'
|
||||
paramName = 'url',
|
||||
options: { allowHttp?: boolean } = {}
|
||||
): Promise<AsyncValidationResult> {
|
||||
const basicValidation = validateExternalUrl(url, paramName)
|
||||
const basicValidation = validateExternalUrl(url, paramName, options)
|
||||
if (!basicValidation.isValid) {
|
||||
return basicValidation
|
||||
}
|
||||
@@ -88,7 +89,10 @@ export async function validateUrlWithDNS(
|
||||
return ip === '127.0.0.1' || ip === '::1'
|
||||
})()
|
||||
|
||||
if (isPrivateOrReservedIP(address) && !(isLocalhost && resolvedIsLoopback)) {
|
||||
if (
|
||||
isPrivateOrReservedIP(address) &&
|
||||
!(isLocalhost && resolvedIsLoopback && !options.allowHttp)
|
||||
) {
|
||||
logger.warn('URL resolves to blocked IP address', {
|
||||
paramName,
|
||||
hostname,
|
||||
@@ -118,6 +122,70 @@ export async function validateUrlWithDNS(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a database hostname by resolving DNS and checking the resolved IP
|
||||
* against private/reserved ranges to prevent SSRF via database connections.
|
||||
*
|
||||
* Unlike validateHostname (which enforces strict RFC hostname format), this
|
||||
* function is permissive about hostname format to avoid breaking legitimate
|
||||
* database hostnames (e.g. underscores in Docker/K8s service names). It only
|
||||
* blocks localhost and private/reserved IPs.
|
||||
*
|
||||
* @param host - The database hostname to validate
|
||||
* @param paramName - Name of the parameter for error messages
|
||||
* @returns AsyncValidationResult with resolved IP
|
||||
*/
|
||||
export async function validateDatabaseHost(
|
||||
host: string | null | undefined,
|
||||
paramName = 'host'
|
||||
): Promise<AsyncValidationResult> {
|
||||
if (!host) {
|
||||
return { isValid: false, error: `${paramName} is required` }
|
||||
}
|
||||
|
||||
const lowerHost = host.toLowerCase()
|
||||
|
||||
if (lowerHost === 'localhost') {
|
||||
return { isValid: false, error: `${paramName} cannot be localhost` }
|
||||
}
|
||||
|
||||
if (ipaddr.isValid(lowerHost) && isPrivateOrReservedIP(lowerHost)) {
|
||||
return { isValid: false, error: `${paramName} cannot be a private IP address` }
|
||||
}
|
||||
|
||||
try {
|
||||
const { address } = await dns.lookup(host, { verbatim: true })
|
||||
|
||||
if (isPrivateOrReservedIP(address)) {
|
||||
logger.warn('Database host resolves to blocked IP address', {
|
||||
paramName,
|
||||
hostname: host,
|
||||
resolvedIP: address,
|
||||
})
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} resolves to a blocked IP address`,
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
isValid: true,
|
||||
resolvedIP: address,
|
||||
originalHostname: host,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('DNS lookup failed for database host', {
|
||||
paramName,
|
||||
hostname: host,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} hostname could not be resolved`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export interface SecureFetchOptions {
|
||||
method?: string
|
||||
headers?: Record<string, string>
|
||||
@@ -183,7 +251,7 @@ function resolveRedirectUrl(baseUrl: string, location: string): string {
|
||||
export async function secureFetchWithPinnedIP(
|
||||
url: string,
|
||||
resolvedIP: string,
|
||||
options: SecureFetchOptions = {},
|
||||
options: SecureFetchOptions & { allowHttp?: boolean } = {},
|
||||
redirectCount = 0
|
||||
): Promise<SecureFetchResponse> {
|
||||
const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
|
||||
@@ -231,7 +299,7 @@ export async function secureFetchWithPinnedIP(
|
||||
res.resume()
|
||||
const redirectUrl = resolveRedirectUrl(url, location)
|
||||
|
||||
validateUrlWithDNS(redirectUrl, 'redirectUrl')
|
||||
validateUrlWithDNS(redirectUrl, 'redirectUrl', { allowHttp: options.allowHttp })
|
||||
.then((validation) => {
|
||||
if (!validation.isValid) {
|
||||
reject(new Error(`Redirect blocked: ${validation.error}`))
|
||||
@@ -340,10 +408,12 @@ export async function secureFetchWithPinnedIP(
|
||||
*/
|
||||
export async function secureFetchWithValidation(
|
||||
url: string,
|
||||
options: SecureFetchOptions = {},
|
||||
options: SecureFetchOptions & { allowHttp?: boolean } = {},
|
||||
paramName = 'url'
|
||||
): Promise<SecureFetchResponse> {
|
||||
const validation = await validateUrlWithDNS(url, paramName)
|
||||
const validation = await validateUrlWithDNS(url, paramName, {
|
||||
allowHttp: options.allowHttp,
|
||||
})
|
||||
if (!validation.isValid) {
|
||||
throw new Error(validation.error)
|
||||
}
|
||||
|
||||
@@ -676,7 +676,8 @@ export function validateJiraIssueKey(
|
||||
*/
|
||||
export function validateExternalUrl(
|
||||
url: string | null | undefined,
|
||||
paramName = 'url'
|
||||
paramName = 'url',
|
||||
options: { allowHttp?: boolean } = {}
|
||||
): ValidationResult {
|
||||
if (!url || typeof url !== 'string') {
|
||||
return {
|
||||
@@ -709,7 +710,20 @@ export function validateExternalUrl(
|
||||
}
|
||||
}
|
||||
|
||||
if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
|
||||
if (options.allowHttp) {
|
||||
if (protocol !== 'https:' && protocol !== 'http:') {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} must use http:// or https:// protocol`,
|
||||
}
|
||||
}
|
||||
if (isLocalhost) {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} cannot point to localhost`,
|
||||
}
|
||||
}
|
||||
} else if (protocol !== 'https:' && !(protocol === 'http:' && isLocalhost)) {
|
||||
return {
|
||||
isValid: false,
|
||||
error: `${paramName} must use https:// protocol`,
|
||||
|
||||
@@ -246,7 +246,7 @@ describe('categorizeError', () => {
|
||||
const error = new Error('Server not accessible')
|
||||
const result = categorizeError(error)
|
||||
expect(result.status).toBe(404)
|
||||
expect(result.message).toBe('Server not accessible')
|
||||
expect(result.message).toBe('Resource not found')
|
||||
})
|
||||
|
||||
it.concurrent('returns 401 for authentication errors', () => {
|
||||
@@ -267,28 +267,28 @@ describe('categorizeError', () => {
|
||||
const error = new Error('Invalid parameter provided')
|
||||
const result = categorizeError(error)
|
||||
expect(result.status).toBe(400)
|
||||
expect(result.message).toBe('Invalid parameter provided')
|
||||
expect(result.message).toBe('Invalid request parameters')
|
||||
})
|
||||
|
||||
it.concurrent('returns 400 for missing required errors', () => {
|
||||
const error = new Error('Missing required field: name')
|
||||
const result = categorizeError(error)
|
||||
expect(result.status).toBe(400)
|
||||
expect(result.message).toBe('Missing required field: name')
|
||||
expect(result.message).toBe('Invalid request parameters')
|
||||
})
|
||||
|
||||
it.concurrent('returns 400 for validation errors', () => {
|
||||
const error = new Error('Validation failed for input')
|
||||
const result = categorizeError(error)
|
||||
expect(result.status).toBe(400)
|
||||
expect(result.message).toBe('Validation failed for input')
|
||||
expect(result.message).toBe('Invalid request parameters')
|
||||
})
|
||||
|
||||
it.concurrent('returns 500 for generic errors', () => {
|
||||
const error = new Error('Something went wrong')
|
||||
const result = categorizeError(error)
|
||||
expect(result.status).toBe(500)
|
||||
expect(result.message).toBe('Something went wrong')
|
||||
expect(result.message).toBe('Internal server error')
|
||||
})
|
||||
|
||||
it.concurrent('returns 500 for non-Error objects', () => {
|
||||
|
||||
@@ -49,18 +49,18 @@ export const MCP_CLIENT_CONSTANTS = {
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Create standardized MCP error response
|
||||
* Create standardized MCP error response.
|
||||
* Always returns the defaultMessage to clients to prevent leaking internal error details.
|
||||
* Callers are responsible for logging the original error before calling this function.
|
||||
*/
|
||||
export function createMcpErrorResponse(
|
||||
error: unknown,
|
||||
_error: unknown,
|
||||
defaultMessage: string,
|
||||
status = 500
|
||||
): NextResponse {
|
||||
const errorMessage = error instanceof Error ? error.message : defaultMessage
|
||||
|
||||
const response: McpApiResponse = {
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
error: defaultMessage,
|
||||
}
|
||||
|
||||
return NextResponse.json(response, { status })
|
||||
@@ -115,36 +115,33 @@ export function validateRequiredFields(
|
||||
}
|
||||
|
||||
/**
|
||||
* Enhanced error categorization for more specific HTTP status codes
|
||||
* Enhanced error categorization for more specific HTTP status codes.
|
||||
* Returns safe, generic messages to prevent leaking internal details.
|
||||
*/
|
||||
export function categorizeError(error: unknown): { message: string; status: number } {
|
||||
if (!(error instanceof Error)) {
|
||||
return { message: 'Unknown error occurred', status: 500 }
|
||||
}
|
||||
|
||||
const message = error.message.toLowerCase()
|
||||
const msg = error.message.toLowerCase()
|
||||
|
||||
if (message.includes('timeout')) {
|
||||
if (msg.includes('timeout')) {
|
||||
return { message: 'Request timed out', status: 408 }
|
||||
}
|
||||
|
||||
if (message.includes('not found') || message.includes('not accessible')) {
|
||||
return { message: error.message, status: 404 }
|
||||
if (msg.includes('not found') || msg.includes('not accessible')) {
|
||||
return { message: 'Resource not found', status: 404 }
|
||||
}
|
||||
|
||||
if (message.includes('authentication') || message.includes('unauthorized')) {
|
||||
if (msg.includes('authentication') || msg.includes('unauthorized')) {
|
||||
return { message: 'Authentication required', status: 401 }
|
||||
}
|
||||
|
||||
if (
|
||||
message.includes('invalid') ||
|
||||
message.includes('missing required') ||
|
||||
message.includes('validation')
|
||||
) {
|
||||
return { message: error.message, status: 400 }
|
||||
if (msg.includes('invalid') || msg.includes('missing required') || msg.includes('validation')) {
|
||||
return { message: 'Invalid request parameters', status: 400 }
|
||||
}
|
||||
|
||||
return { message: error.message, status: 500 }
|
||||
return { message: 'Internal server error', status: 500 }
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
/**
|
||||
* Periodic memory telemetry for diagnosing heap growth in production.
|
||||
* Logs process.memoryUsage(), V8 heap stats, and active SSE connection
|
||||
* counts every 60s, enabling correlation between connection leaks and
|
||||
* memory spikes.
|
||||
* Periodic memory telemetry for monitoring heap growth in production.
|
||||
* Logs process.memoryUsage() and V8 heap stats every 60s.
|
||||
*/
|
||||
|
||||
import v8 from 'node:v8'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import {
|
||||
getActiveSSEConnectionCount,
|
||||
getActiveSSEConnectionsByRoute,
|
||||
} from '@/lib/monitoring/sse-connections'
|
||||
|
||||
const logger = createLogger('MemoryTelemetry', { logLevel: 'INFO' })
|
||||
|
||||
@@ -23,16 +17,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
|
||||
started = true
|
||||
|
||||
const timer = setInterval(() => {
|
||||
// Trigger opportunistic (non-blocking) garbage collection if running on Bun.
|
||||
// This signals JSC GC + mimalloc page purge without blocking the event loop,
|
||||
// helping reclaim RSS that mimalloc otherwise retains under sustained load.
|
||||
const bunGlobal = (globalThis as Record<string, unknown>).Bun as
|
||||
| { gc?: (force: boolean) => void }
|
||||
| undefined
|
||||
if (typeof bunGlobal?.gc === 'function') {
|
||||
bunGlobal.gc(false)
|
||||
}
|
||||
|
||||
const mem = process.memoryUsage()
|
||||
const heap = v8.getHeapStatistics()
|
||||
|
||||
@@ -49,8 +33,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
|
||||
? process.getActiveResourcesInfo().length
|
||||
: -1,
|
||||
uptimeMin: Math.round(process.uptime() / 60),
|
||||
activeSSEConnections: getActiveSSEConnectionCount(),
|
||||
sseByRoute: getActiveSSEConnectionsByRoute(),
|
||||
})
|
||||
}, intervalMs)
|
||||
timer.unref()
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
/**
|
||||
* Tracks active SSE connections by route for memory leak diagnostics.
|
||||
* Logged alongside periodic memory telemetry to correlate connection
|
||||
* counts with heap growth.
|
||||
*/
|
||||
|
||||
const connections = new Map<string, number>()
|
||||
|
||||
export function incrementSSEConnections(route: string) {
|
||||
connections.set(route, (connections.get(route) ?? 0) + 1)
|
||||
}
|
||||
|
||||
export function decrementSSEConnections(route: string) {
|
||||
const count = (connections.get(route) ?? 0) - 1
|
||||
if (count <= 0) connections.delete(route)
|
||||
else connections.set(route, count)
|
||||
}
|
||||
|
||||
export function getActiveSSEConnectionCount(): number {
|
||||
let total = 0
|
||||
for (const count of connections.values()) total += count
|
||||
return total
|
||||
}
|
||||
|
||||
export function getActiveSSEConnectionsByRoute(): Record<string, number> {
|
||||
return Object.fromEntries(connections)
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db'
|
||||
import { account, credentialSet, subscription } from '@sim/db/schema'
|
||||
import { credentialSet, subscription } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, isNull, or } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
@@ -7,6 +7,7 @@ import { v4 as uuidv4 } from 'uuid'
|
||||
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import { isProd } from '@/lib/core/config/feature-flags'
|
||||
import { safeCompare } from '@/lib/core/security/encryption'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
|
||||
@@ -25,8 +26,6 @@ import {
|
||||
validateTypeformSignature,
|
||||
verifyProviderWebhook,
|
||||
} from '@/lib/webhooks/utils.server'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
|
||||
import { executeWebhookJob } from '@/background/webhook-execution'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
import { isConfluencePayloadMatch } from '@/triggers/confluence/utils'
|
||||
@@ -40,6 +39,12 @@ export interface WebhookProcessorOptions {
|
||||
requestId: string
|
||||
path?: string
|
||||
webhookId?: string
|
||||
actorUserId?: string
|
||||
}
|
||||
|
||||
export interface WebhookPreprocessingResult {
|
||||
error: NextResponse | null
|
||||
actorUserId?: string
|
||||
}
|
||||
|
||||
function getExternalUrl(request: NextRequest): string {
|
||||
@@ -800,14 +805,14 @@ export async function verifyProviderAuth(
|
||||
|
||||
if (secretHeaderName) {
|
||||
const headerValue = request.headers.get(secretHeaderName.toLowerCase())
|
||||
if (headerValue === configToken) {
|
||||
if (headerValue && safeCompare(headerValue, configToken)) {
|
||||
isTokenValid = true
|
||||
}
|
||||
} else {
|
||||
const authHeader = request.headers.get('authorization')
|
||||
if (authHeader?.toLowerCase().startsWith('bearer ')) {
|
||||
const token = authHeader.substring(7)
|
||||
if (token === configToken) {
|
||||
if (safeCompare(token, configToken)) {
|
||||
isTokenValid = true
|
||||
}
|
||||
}
|
||||
@@ -835,7 +840,7 @@ export async function checkWebhookPreprocessing(
|
||||
foundWorkflow: any,
|
||||
foundWebhook: any,
|
||||
requestId: string
|
||||
): Promise<NextResponse | null> {
|
||||
): Promise<WebhookPreprocessingResult> {
|
||||
try {
|
||||
const executionId = uuidv4()
|
||||
|
||||
@@ -848,6 +853,7 @@ export async function checkWebhookPreprocessing(
|
||||
checkRateLimit: true,
|
||||
checkDeployment: true,
|
||||
workspaceId: foundWorkflow.workspaceId,
|
||||
workflowRecord: foundWorkflow,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
@@ -859,33 +865,39 @@ export async function checkWebhookPreprocessing(
|
||||
})
|
||||
|
||||
if (foundWebhook.provider === 'microsoft-teams') {
|
||||
return NextResponse.json(
|
||||
{
|
||||
type: 'message',
|
||||
text: error.message,
|
||||
},
|
||||
{ status: error.statusCode }
|
||||
)
|
||||
return {
|
||||
error: NextResponse.json(
|
||||
{
|
||||
type: 'message',
|
||||
text: error.message,
|
||||
},
|
||||
{ status: error.statusCode }
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
return NextResponse.json({ error: error.message }, { status: error.statusCode })
|
||||
return { error: NextResponse.json({ error: error.message }, { status: error.statusCode }) }
|
||||
}
|
||||
|
||||
return null
|
||||
return { error: null, actorUserId: preprocessResult.actorUserId }
|
||||
} catch (preprocessError) {
|
||||
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)
|
||||
|
||||
if (foundWebhook.provider === 'microsoft-teams') {
|
||||
return NextResponse.json(
|
||||
{
|
||||
type: 'message',
|
||||
text: 'Internal error during preprocessing',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
return {
|
||||
error: NextResponse.json(
|
||||
{
|
||||
type: 'message',
|
||||
text: 'Internal error during preprocessing',
|
||||
},
|
||||
{ status: 500 }
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
return NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 })
|
||||
return {
|
||||
error: NextResponse.json({ error: 'Internal error during preprocessing' }, { status: 500 }),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1059,22 +1071,7 @@ export async function queueWebhookExecution(
|
||||
// Note: Each webhook now has its own credentialId (credential sets are fanned out at save time)
|
||||
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
|
||||
const credentialId = providerConfig.credentialId as string | undefined
|
||||
let credentialAccountUserId: string | undefined
|
||||
if (credentialId) {
|
||||
const resolved = await resolveOAuthAccountId(credentialId)
|
||||
if (!resolved) {
|
||||
logger.error(
|
||||
`[${options.requestId}] Failed to resolve OAuth account for credential ${credentialId}`
|
||||
)
|
||||
return formatProviderErrorResponse(foundWebhook, 'Failed to resolve credential', 500)
|
||||
}
|
||||
const [credentialRecord] = await db
|
||||
.select({ userId: account.userId })
|
||||
.from(account)
|
||||
.where(eq(account.id, resolved.accountId))
|
||||
.limit(1)
|
||||
credentialAccountUserId = credentialRecord?.userId
|
||||
}
|
||||
|
||||
// credentialSetId is a direct field on webhook table, not in providerConfig
|
||||
const credentialSetId = foundWebhook.credentialSetId as string | undefined
|
||||
|
||||
@@ -1089,16 +1086,9 @@ export async function queueWebhookExecution(
|
||||
}
|
||||
}
|
||||
|
||||
if (!foundWorkflow.workspaceId) {
|
||||
logger.error(`[${options.requestId}] Workflow ${foundWorkflow.id} has no workspaceId`)
|
||||
return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 })
|
||||
}
|
||||
|
||||
const actorUserId = await getWorkspaceBilledAccountUserId(foundWorkflow.workspaceId)
|
||||
const actorUserId = options.actorUserId
|
||||
if (!actorUserId) {
|
||||
logger.error(
|
||||
`[${options.requestId}] No billing account for workspace ${foundWorkflow.workspaceId}`
|
||||
)
|
||||
logger.error(`[${options.requestId}] No actorUserId provided for webhook ${foundWebhook.id}`)
|
||||
return NextResponse.json({ error: 'Unable to resolve billing account' }, { status: 500 })
|
||||
}
|
||||
|
||||
@@ -1111,8 +1101,8 @@ export async function queueWebhookExecution(
|
||||
headers,
|
||||
path: options.path || foundWebhook.path,
|
||||
blockId: foundWebhook.blockId,
|
||||
workspaceId: foundWorkflow.workspaceId,
|
||||
...(credentialId ? { credentialId } : {}),
|
||||
...(credentialAccountUserId ? { credentialAccountUserId } : {}),
|
||||
}
|
||||
|
||||
const jobQueue = await getJobQueue()
|
||||
@@ -1166,6 +1156,12 @@ export async function queueWebhookExecution(
|
||||
})
|
||||
}
|
||||
|
||||
// Slack requires an empty 200 for interactive payloads (view_submission, block_actions, etc.)
|
||||
// A JSON body like {"message":"..."} is not a recognized response format and causes modal errors
|
||||
if (foundWebhook.provider === 'slack') {
|
||||
return new NextResponse(null, { status: 200 })
|
||||
}
|
||||
|
||||
// Twilio Voice requires TwiML XML response
|
||||
if (foundWebhook.provider === 'twilio_voice') {
|
||||
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
|
||||
@@ -1211,6 +1207,12 @@ export async function queueWebhookExecution(
|
||||
)
|
||||
}
|
||||
|
||||
if (foundWebhook.provider === 'slack') {
|
||||
// Return empty 200 to avoid Slack showing an error dialog to the user,
|
||||
// even though processing failed. The error is already logged above.
|
||||
return new NextResponse(null, { status: 200 })
|
||||
}
|
||||
|
||||
if (foundWebhook.provider === 'twilio_voice') {
|
||||
const errorTwiml = `<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Response>
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { IncomingMessage, ServerResponse } from 'http'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { safeCompare } from '@/lib/core/security/encryption'
|
||||
import type { IRoomManager } from '@/socket/rooms'
|
||||
|
||||
interface Logger {
|
||||
@@ -21,7 +22,8 @@ function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?:
|
||||
return { success: false, error: 'API key required' }
|
||||
}
|
||||
|
||||
if (apiKey !== expectedApiKey) {
|
||||
const apiKeyStr = Array.isArray(apiKey) ? apiKey[0] : apiKey
|
||||
if (!apiKeyStr || !safeCompare(apiKeyStr, expectedApiKey)) {
|
||||
return { success: false, error: 'Invalid API key' }
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
id: 'parallel_deep_research',
|
||||
name: 'Parallel AI Deep Research',
|
||||
description:
|
||||
'Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 15 minutes to complete.',
|
||||
'Conduct comprehensive deep research across the web using Parallel AI. Synthesizes information from multiple sources with citations. Can take up to 45 minutes to complete.',
|
||||
version: '1.0.0',
|
||||
|
||||
params: {
|
||||
@@ -22,8 +22,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description:
|
||||
'Compute level: base, lite, pro, ultra, ultra2x, ultra4x, ultra8x (default: base)',
|
||||
description: 'Processing tier: pro, ultra, pro-fast, ultra-fast (default: pro)',
|
||||
},
|
||||
include_domains: {
|
||||
type: 'string',
|
||||
@@ -55,15 +54,12 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
body: (params) => {
|
||||
const body: Record<string, unknown> = {
|
||||
input: params.input,
|
||||
processor: params.processor || 'base',
|
||||
processor: params.processor || 'pro',
|
||||
task_spec: {
|
||||
output_schema: 'auto',
|
||||
},
|
||||
}
|
||||
|
||||
const taskSpec: Record<string, unknown> = {}
|
||||
|
||||
taskSpec.output_schema = 'auto'
|
||||
|
||||
body.task_spec = taskSpec
|
||||
|
||||
if (params.include_domains || params.exclude_domains) {
|
||||
const sourcePolicy: Record<string, string[]> = {}
|
||||
|
||||
@@ -91,14 +87,21 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
throw new Error(
|
||||
`Parallel AI deep research task creation failed: ${response.status} - ${errorText}`
|
||||
)
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
run_id: data.run_id,
|
||||
status: data.status,
|
||||
message: `Research task ${data.status}, waiting for completion...`,
|
||||
run_id: data.run_id ?? null,
|
||||
status: data.status ?? null,
|
||||
message: `Research task ${data.status ?? 'created'}, waiting for completion...`,
|
||||
content: {},
|
||||
basis: [],
|
||||
},
|
||||
@@ -122,13 +125,16 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
logger.info(`Parallel AI deep research task ${runId} created, fetching results...`)
|
||||
|
||||
try {
|
||||
const resultResponse = await fetch(`https://api.parallel.ai/v1/tasks/runs/${runId}/result`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'x-api-key': params.apiKey,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
})
|
||||
const resultResponse = await fetch(
|
||||
`https://api.parallel.ai/v1/tasks/runs/${String(runId).trim()}/result`,
|
||||
{
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'x-api-key': params.apiKey,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if (!resultResponse.ok) {
|
||||
const errorText = await resultResponse.text()
|
||||
@@ -138,17 +144,17 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
const taskResult = await resultResponse.json()
|
||||
logger.info(`Parallel AI deep research task ${runId} completed`)
|
||||
|
||||
const output = taskResult.output || {}
|
||||
const run = taskResult.run || {}
|
||||
const output = taskResult.output ?? {}
|
||||
const status = taskResult.status ?? 'completed'
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
status: run.status || 'completed',
|
||||
status,
|
||||
run_id: runId,
|
||||
message: 'Research completed successfully',
|
||||
content: output.content || {},
|
||||
basis: output.basis || [],
|
||||
content: output.content ?? {},
|
||||
basis: output.basis ?? [],
|
||||
},
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
@@ -169,7 +175,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
outputs: {
|
||||
status: {
|
||||
type: 'string',
|
||||
description: 'Task status (completed, failed)',
|
||||
description: 'Task status (completed, failed, running)',
|
||||
},
|
||||
run_id: {
|
||||
type: 'string',
|
||||
@@ -189,7 +195,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
items: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
field: { type: 'string', description: 'Output field name' },
|
||||
field: { type: 'string', description: 'Output field dot-notation path' },
|
||||
reasoning: { type: 'string', description: 'Explanation for the result' },
|
||||
citations: {
|
||||
type: 'array',
|
||||
@@ -203,7 +209,7 @@ export const deepResearchTool: ToolConfig<ParallelDeepResearchParams, ToolRespon
|
||||
},
|
||||
},
|
||||
},
|
||||
confidence: { type: 'string', description: 'Confidence level indicator' },
|
||||
confidence: { type: 'string', description: 'Confidence level (high, medium)' },
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -17,21 +17,21 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
|
||||
},
|
||||
objective: {
|
||||
type: 'string',
|
||||
required: true,
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'What information to extract from the provided URLs',
|
||||
},
|
||||
excerpts: {
|
||||
type: 'boolean',
|
||||
required: true,
|
||||
visibility: 'user-only',
|
||||
description: 'Include relevant excerpts from the content',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Include relevant excerpts from the content (default: true)',
|
||||
},
|
||||
full_content: {
|
||||
type: 'boolean',
|
||||
required: true,
|
||||
visibility: 'user-only',
|
||||
description: 'Include full page content',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Include full page content as markdown (default: false)',
|
||||
},
|
||||
apiKey: {
|
||||
type: 'string',
|
||||
@@ -50,7 +50,6 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
|
||||
'parallel-beta': 'search-extract-2025-10-10',
|
||||
}),
|
||||
body: (params) => {
|
||||
// Convert comma-separated URLs to array
|
||||
const urlArray = params.urls
|
||||
.split(',')
|
||||
.map((url) => url.trim())
|
||||
@@ -58,10 +57,9 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
|
||||
|
||||
const body: Record<string, unknown> = {
|
||||
urls: urlArray,
|
||||
objective: params.objective,
|
||||
}
|
||||
|
||||
// Add optional parameters if provided
|
||||
if (params.objective) body.objective = params.objective
|
||||
if (params.excerpts !== undefined) body.excerpts = params.excerpts
|
||||
if (params.full_content !== undefined) body.full_content = params.full_content
|
||||
|
||||
@@ -70,17 +68,44 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
throw new Error(`Parallel AI extract failed: ${response.status} - ${errorText}`)
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.results) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No results returned from extraction',
|
||||
output: {
|
||||
results: [],
|
||||
extract_id: data.extract_id ?? null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
results: data.results || [],
|
||||
extract_id: data.extract_id ?? null,
|
||||
results: data.results.map((result: Record<string, unknown>) => ({
|
||||
url: result.url ?? null,
|
||||
title: result.title ?? null,
|
||||
publish_date: result.publish_date ?? null,
|
||||
excerpts: result.excerpts ?? [],
|
||||
full_content: result.full_content ?? null,
|
||||
})),
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
extract_id: {
|
||||
type: 'string',
|
||||
description: 'Unique identifier for this extraction request',
|
||||
},
|
||||
results: {
|
||||
type: 'array',
|
||||
description: 'Extracted information from the provided URLs',
|
||||
@@ -88,12 +113,22 @@ export const extractTool: ToolConfig<ParallelExtractParams, ToolResponse> = {
|
||||
type: 'object',
|
||||
properties: {
|
||||
url: { type: 'string', description: 'The source URL' },
|
||||
title: { type: 'string', description: 'The title of the page' },
|
||||
content: { type: 'string', description: 'Extracted content' },
|
||||
title: { type: 'string', description: 'The title of the page', optional: true },
|
||||
publish_date: {
|
||||
type: 'string',
|
||||
description: 'Publication date (YYYY-MM-DD)',
|
||||
optional: true,
|
||||
},
|
||||
excerpts: {
|
||||
type: 'array',
|
||||
description: 'Relevant text excerpts',
|
||||
description: 'Relevant text excerpts in markdown',
|
||||
items: { type: 'string' },
|
||||
optional: true,
|
||||
},
|
||||
full_content: {
|
||||
type: 'string',
|
||||
description: 'Full page content as markdown',
|
||||
optional: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -5,3 +5,5 @@ import { searchTool } from '@/tools/parallel/search'
|
||||
export const parallelSearchTool = searchTool
|
||||
export const parallelExtractTool = extractTool
|
||||
export const parallelDeepResearchTool = deepResearchTool
|
||||
|
||||
export * from './types'
|
||||
|
||||
@@ -19,25 +19,37 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Optional comma-separated list of search queries to execute',
|
||||
description: 'Comma-separated list of search queries to execute',
|
||||
},
|
||||
processor: {
|
||||
mode: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Processing method: base or pro (default: base)',
|
||||
description: 'Search mode: one-shot, agentic, or fast (default: one-shot)',
|
||||
},
|
||||
max_results: {
|
||||
type: 'number',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Maximum number of results to return (default: 5)',
|
||||
description: 'Maximum number of results to return (default: 10)',
|
||||
},
|
||||
max_chars_per_result: {
|
||||
type: 'number',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Maximum characters per result (default: 1500)',
|
||||
description: 'Maximum characters per result excerpt (minimum: 1000)',
|
||||
},
|
||||
include_domains: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Comma-separated list of domains to restrict search results to',
|
||||
},
|
||||
exclude_domains: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Comma-separated list of domains to exclude from search results',
|
||||
},
|
||||
apiKey: {
|
||||
type: 'string',
|
||||
@@ -60,44 +72,83 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
|
||||
objective: params.objective,
|
||||
}
|
||||
|
||||
// Only include search_queries if it's not empty
|
||||
if (
|
||||
params.search_queries !== undefined &&
|
||||
params.search_queries !== null &&
|
||||
params.search_queries.length > 0
|
||||
) {
|
||||
body.search_queries = params.search_queries
|
||||
if (params.search_queries) {
|
||||
if (Array.isArray(params.search_queries)) {
|
||||
body.search_queries = params.search_queries
|
||||
} else if (typeof params.search_queries === 'string') {
|
||||
const queries = params.search_queries
|
||||
.split(',')
|
||||
.map((q: string) => q.trim())
|
||||
.filter((q: string) => q.length > 0)
|
||||
if (queries.length > 0) body.search_queries = queries
|
||||
}
|
||||
}
|
||||
|
||||
// Add optional parameters if provided
|
||||
if (params.processor) body.processor = params.processor
|
||||
if (params.mode) body.mode = params.mode
|
||||
if (params.max_results) body.max_results = Number(params.max_results)
|
||||
if (params.max_chars_per_result)
|
||||
body.max_chars_per_result = Number(params.max_chars_per_result)
|
||||
if (params.max_chars_per_result) {
|
||||
body.excerpts = { max_chars_per_result: Number(params.max_chars_per_result) }
|
||||
}
|
||||
|
||||
const sourcePolicy: Record<string, string[]> = {}
|
||||
if (params.include_domains) {
|
||||
sourcePolicy.include_domains = params.include_domains
|
||||
.split(',')
|
||||
.map((d: string) => d.trim())
|
||||
.filter((d: string) => d.length > 0)
|
||||
}
|
||||
if (params.exclude_domains) {
|
||||
sourcePolicy.exclude_domains = params.exclude_domains
|
||||
.split(',')
|
||||
.map((d: string) => d.trim())
|
||||
.filter((d: string) => d.length > 0)
|
||||
}
|
||||
if (Object.keys(sourcePolicy).length > 0) {
|
||||
body.source_policy = sourcePolicy
|
||||
}
|
||||
|
||||
return body
|
||||
},
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
throw new Error(`Parallel AI search failed: ${response.status} - ${errorText}`)
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.results) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No results returned from search',
|
||||
output: {
|
||||
results: [],
|
||||
search_id: data.search_id ?? null,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
results: data.results.map((result: unknown) => {
|
||||
const resultObj = result as Record<string, unknown>
|
||||
return {
|
||||
url: resultObj.url || '',
|
||||
title: resultObj.title || '',
|
||||
excerpts: resultObj.excerpts || [],
|
||||
}
|
||||
}),
|
||||
search_id: data.search_id ?? null,
|
||||
results: data.results.map((result: Record<string, unknown>) => ({
|
||||
url: result.url ?? null,
|
||||
title: result.title ?? null,
|
||||
publish_date: result.publish_date ?? null,
|
||||
excerpts: result.excerpts ?? [],
|
||||
})),
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
search_id: {
|
||||
type: 'string',
|
||||
description: 'Unique identifier for this search request',
|
||||
},
|
||||
results: {
|
||||
type: 'array',
|
||||
description: 'Search results with excerpts from relevant pages',
|
||||
@@ -106,9 +157,14 @@ export const searchTool: ToolConfig<ParallelSearchParams, ToolResponse> = {
|
||||
properties: {
|
||||
url: { type: 'string', description: 'The URL of the search result' },
|
||||
title: { type: 'string', description: 'The title of the search result' },
|
||||
publish_date: {
|
||||
type: 'string',
|
||||
description: 'Publication date of the page (YYYY-MM-DD)',
|
||||
optional: true,
|
||||
},
|
||||
excerpts: {
|
||||
type: 'array',
|
||||
description: 'Text excerpts from the page',
|
||||
description: 'LLM-optimized excerpts from the page',
|
||||
items: { type: 'string' },
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1,39 +1,51 @@
|
||||
import type { ToolResponse } from '@/tools/types'
|
||||
|
||||
export interface ParallelSearchParams {
|
||||
objective: string
|
||||
search_queries: string[]
|
||||
processor?: string
|
||||
search_queries?: string[] | string
|
||||
mode?: string
|
||||
max_results?: number
|
||||
max_chars_per_result?: number
|
||||
include_domains?: string
|
||||
exclude_domains?: string
|
||||
apiKey: string
|
||||
}
|
||||
|
||||
export interface ParallelSearchResult {
|
||||
url: string
|
||||
title: string
|
||||
url: string | null
|
||||
title: string | null
|
||||
publish_date?: string | null
|
||||
excerpts: string[]
|
||||
}
|
||||
|
||||
export interface ParallelSearchResponse {
|
||||
results: ParallelSearchResult[]
|
||||
export interface ParallelSearchResponse extends ToolResponse {
|
||||
output: {
|
||||
search_id: string | null
|
||||
results: ParallelSearchResult[]
|
||||
}
|
||||
}
|
||||
|
||||
export interface ParallelExtractParams {
|
||||
urls: string
|
||||
objective: string
|
||||
excerpts: boolean
|
||||
full_content: boolean
|
||||
objective?: string
|
||||
excerpts?: boolean
|
||||
full_content?: boolean
|
||||
apiKey: string
|
||||
}
|
||||
|
||||
export interface ParallelExtractResult {
|
||||
url: string
|
||||
title: string
|
||||
content?: string
|
||||
url: string | null
|
||||
title?: string | null
|
||||
publish_date?: string | null
|
||||
excerpts?: string[]
|
||||
full_content?: string | null
|
||||
}
|
||||
|
||||
export interface ParallelExtractResponse {
|
||||
results: ParallelExtractResult[]
|
||||
export interface ParallelExtractResponse extends ToolResponse {
|
||||
output: {
|
||||
extract_id: string | null
|
||||
results: ParallelExtractResult[]
|
||||
}
|
||||
}
|
||||
|
||||
export interface ParallelDeepResearchParams {
|
||||
@@ -45,17 +57,22 @@ export interface ParallelDeepResearchParams {
|
||||
}
|
||||
|
||||
export interface ParallelDeepResearchBasis {
|
||||
url: string
|
||||
title: string
|
||||
excerpt: string
|
||||
confidence?: number
|
||||
field: string
|
||||
reasoning: string
|
||||
citations: {
|
||||
url: string
|
||||
title: string
|
||||
excerpts: string[]
|
||||
}[]
|
||||
confidence: string
|
||||
}
|
||||
|
||||
export interface ParallelDeepResearchResponse {
|
||||
status: string
|
||||
run_id: string
|
||||
message?: string
|
||||
content?: Record<string, unknown>
|
||||
basis?: ParallelDeepResearchBasis[]
|
||||
metadata?: Record<string, unknown>
|
||||
export interface ParallelDeepResearchResponse extends ToolResponse {
|
||||
output: {
|
||||
status: string
|
||||
run_id: string
|
||||
message: string
|
||||
content: Record<string, unknown>
|
||||
basis: ParallelDeepResearchBasis[]
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ export const genericWebhookTrigger: TriggerConfig = {
|
||||
title: 'Require Authentication',
|
||||
type: 'switch',
|
||||
description: 'Require authentication for all webhook requests',
|
||||
defaultValue: false,
|
||||
defaultValue: true,
|
||||
mode: 'trigger',
|
||||
},
|
||||
{
|
||||
@@ -36,6 +36,7 @@ export const genericWebhookTrigger: TriggerConfig = {
|
||||
description: 'Token used to authenticate webhook requests via Bearer token or custom header',
|
||||
password: true,
|
||||
required: false,
|
||||
value: () => crypto.randomUUID(),
|
||||
mode: 'trigger',
|
||||
},
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user