fix(stateless): add support for oauth credentials in stateless executions

This commit is contained in:
Waleed Latif
2025-03-09 18:13:20 -07:00
parent b8c30396cb
commit af30c296f4
9 changed files with 44 additions and 51 deletions

View File

@@ -118,7 +118,9 @@ export async function POST(request: NextRequest) {
})
if (!response.ok) {
throw new Error('Failed to refresh token')
const errorText = await response.text()
console.error('Token refresh failed:', response.status, errorText)
throw new Error(`Failed to refresh token: ${response.status} ${errorText}`)
}
const data = await response.json()

View File

@@ -18,7 +18,6 @@ export async function POST(request: Request) {
// Use executeTool with skipProxy=true to prevent recursive proxy calls
const result = await executeTool(toolId, params, true)
console.log('result', result)
if (!result.success) {
throw new Error(

View File

@@ -152,7 +152,6 @@ const runningExecutions = new Set<string>()
// Add GET handler for cron job
export async function GET(req: NextRequest) {
const now = new Date()
console.log('Starting scheduled execution check at:', now.toISOString())
let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = []
@@ -165,22 +164,12 @@ export async function GET(req: NextRequest) {
// Limit to 10 workflows per minute to prevent overload
.limit(10)
console.log('Found due schedules:', dueSchedules.length)
for (const schedule of dueSchedules) {
console.log('Processing schedule:', {
id: schedule.id,
workflowId: schedule.workflowId,
nextRunAt: schedule.nextRunAt,
cronExpression: schedule.cronExpression,
})
const executionId = uuidv4()
try {
// Skip if this workflow is already running
if (runningExecutions.has(schedule.workflowId)) {
console.log(`Skipping workflow ${schedule.workflowId} - already running`)
continue
}
@@ -298,7 +287,6 @@ export async function GET(req: NextRequest) {
if (result.success) {
// Calculate the next run time based on the schedule configuration
const nextRunAt = calculateNextRunTime(schedule, blocks)
console.log('Calculated next run time:', nextRunAt.toISOString())
// Update the schedule with the next run time
await db
@@ -309,8 +297,6 @@ export async function GET(req: NextRequest) {
nextRunAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Updated schedule with new run time')
} else {
// If execution failed, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay
@@ -323,8 +309,6 @@ export async function GET(req: NextRequest) {
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Execution failed, scheduled retry at:', nextRetryAt.toISOString())
}
} catch (error: any) {
console.error(`Error executing scheduled workflow ${schedule.workflowId}:`, error)
@@ -343,8 +327,6 @@ export async function GET(req: NextRequest) {
nextRunAt: nextRetryAt,
})
.where(eq(workflowSchedule.id, schedule.id))
console.log('Execution error, scheduled retry at:', nextRetryAt.toISOString())
} finally {
runningExecutions.delete(schedule.workflowId)
}

View File

@@ -48,12 +48,6 @@ export async function GET(request: NextRequest) {
// Construct the WhatsApp verification URL
const whatsappUrl = `${webhookUrl}?hub.mode=subscribe&hub.verify_token=${verificationToken}&hub.challenge=${challenge}`
console.log('Testing WhatsApp webhook:', {
webhookId,
url: whatsappUrl,
token: verificationToken ? verificationToken.substring(0, 3) + '***' : null,
})
// Make a request to the webhook endpoint
const response = await fetch(whatsappUrl, {
headers: {
@@ -66,12 +60,6 @@ export async function GET(request: NextRequest) {
const contentType = response.headers.get('content-type')
const responseText = await response.text()
console.log('WhatsApp test response:', {
status,
contentType,
responseText,
})
// Check if the test was successful
const success = status === 200 && responseText === challenge

View File

@@ -275,6 +275,8 @@ export async function POST(
workflowId: foundWorkflow.id,
}
console.log('Executing workflow with workflowId:', foundWorkflow.id)
const executor = new Executor(
serializedWorkflow,
currentBlockStates,

View File

@@ -192,7 +192,6 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
// Update the actual provider after saving
setActualProvider(webhookProvider || 'generic')
console.log('Webhook configuration saved successfully')
return true
} catch (error: any) {
console.error('Error saving webhook:', error)
@@ -223,7 +222,6 @@ export function WebhookConfig({ blockId, subBlockId, isConnecting }: WebhookConf
setWebhookId(null)
setActualProvider(null)
console.log('Webhook deleted successfully')
return true
} catch (error: any) {
console.error('Error deleting webhook:', error)

View File

@@ -535,7 +535,10 @@ export class ApiBlockHandler implements BlockHandler {
throw new Error(`Tool not found: ${block.config.tool}`)
}
const result = await executeTool(block.config.tool, inputs)
const result = await executeTool(block.config.tool, {
...inputs,
_context: { workflowId: context.workflowId },
})
if (!result.success) {
throw new Error(result.error || `API request failed with no error message`)
}
@@ -563,7 +566,14 @@ export class FunctionBlockHandler implements BlockHandler {
: inputs.code
// Use the shared helper function
const result = await executeCodeWithFallback(codeContent, inputs, inputs.timeout || 5000)
const result = await executeCodeWithFallback(
codeContent,
{
...inputs,
_context: { workflowId: context.workflowId },
},
inputs.timeout || 5000
)
if (!result.success) {
throw new Error(result.error || 'Function execution failed')
@@ -592,7 +602,10 @@ export class GenericBlockHandler implements BlockHandler {
throw new Error(`Tool not found: ${block.config.tool}`)
}
const result = await executeTool(block.config.tool, inputs)
const result = await executeTool(block.config.tool, {
...inputs,
_context: { workflowId: context.workflowId },
})
if (!result.success) {
throw new Error(result.error || `Block execution failed with no error message`)
}

View File

@@ -26,7 +26,7 @@ import { sheetsReadTool, sheetsUpdateTool, sheetsWriteTool } from './sheets'
import { slackMessageTool } from './slack/message'
import { supabaseInsertTool, supabaseQueryTool, supabaseUpdateTool } from './supabase'
import { tavilyExtractTool, tavilySearchTool } from './tavily'
import { ToolConfig, ToolResponse } from './types'
import { OAuthTokenPayload, ToolConfig, ToolResponse } from './types'
import { formatRequestParams, validateToolRequest } from './utils'
import { visionTool } from './vision/vision'
import { whatsappSendMessageTool } from './whatsapp'
@@ -90,7 +90,6 @@ export function getTool(toolId: string): ToolConfig | undefined {
// Check for built-in tools
const builtInTool = tools[toolId]
if (builtInTool) return builtInTool
console.log('toolId', toolId)
// Check if it's a custom tool
if (toolId.startsWith('custom_')) {
@@ -289,7 +288,6 @@ export async function executeTool(
): Promise<ToolResponse> {
try {
const tool = getTool(toolId)
console.log('tool', tool)
// Validate the tool and its parameters
validateToolRequest(toolId, tool, params)
@@ -369,7 +367,6 @@ async function handleInternalRequest(
headers: requestParams.headers,
body: requestParams.body,
})
console.log('response', response)
if (!response.ok) {
let errorData
@@ -470,14 +467,21 @@ async function handleProxyRequest(
// If we have a credential parameter, fetch the access token
if (params.credential) {
try {
// Determine if we're in a browser or server context
const isServerSide = typeof window === 'undefined'
// Prepare the token payload - include workflowId for server-side context
const tokenPayload =
isServerSide && params.workflowId
? { credentialId: params.credential, workflowId: params.workflowId }
: { credentialId: params.credential }
// Prepare the token payload
const tokenPayload: OAuthTokenPayload = {
credentialId: params.credential,
}
// Add workflowId if it exists in params or context
if (isServerSide) {
// Try to get workflowId from params or context
const workflowId = params.workflowId || params._context?.workflowId
if (workflowId) {
tokenPayload.workflowId = workflowId
}
}
const response = await fetch(`${baseUrl}/api/auth/oauth/token`, {
method: 'POST',
@@ -486,7 +490,9 @@ async function handleProxyRequest(
})
if (!response.ok) {
throw new Error('Failed to fetch access token')
const errorText = await response.text()
console.error('Token fetch failed:', response.status, errorText)
throw new Error(`Failed to fetch access token: ${response.status} ${errorText}`)
}
const data = await response.json()
@@ -494,9 +500,7 @@ async function handleProxyRequest(
// Clean up params we don't need to pass to the actual tool
delete params.credential
if (isServerSide && params.workflowId) {
delete params.workflowId
}
if (params.workflowId) delete params.workflowId
} catch (error) {
console.error('Error fetching access token:', error)
throw error

View File

@@ -60,3 +60,8 @@ export interface TableRow {
Value: string
}
}
export interface OAuthTokenPayload {
credentialId: string
workflowId?: string
}