feat(serverless): for serverless executions (webhooks & scheduled), use workflow-id based authentication instead of session based to support OAuth

This commit is contained in:
Waleed Latif
2025-03-09 17:43:50 -07:00
parent 31ec93e688
commit b8c30396cb
4 changed files with 75 additions and 15 deletions

View File

@@ -2,33 +2,57 @@ import { NextRequest, NextResponse } from 'next/server'
import { and, eq } from 'drizzle-orm'
import { getSession } from '@/lib/auth'
import { db } from '@/db'
import { account } from '@/db/schema'
import { account, workflow } from '@/db/schema'
/**
* Get an access token for a specific credential
* Supports both session-based authentication (for client-side requests)
* and workflow-based authentication (for server-side requests)
*/
export async function POST(request: NextRequest) {
try {
// Get the session
const session = await getSession()
// Check if the user is authenticated
if (!session?.user?.id) {
return NextResponse.json({ error: 'User not authenticated' }, { status: 401 })
}
// Get the credential ID from the request body
const { credentialId } = await request.json()
// Parse request body
const body = await request.json()
const { credentialId, workflowId } = body
if (!credentialId) {
return NextResponse.json({ error: 'Credential ID is required' }, { status: 400 })
}
// Determine the user ID based on the context
let userId: string | undefined
// If workflowId is provided, this is a server-side request
if (workflowId) {
// Get the workflow to verify the user ID
const workflows = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflows.length) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
userId = workflows[0].userId
} else {
// This is a client-side request, use the session
const session = await getSession()
// Check if the user is authenticated
if (!session?.user?.id) {
return NextResponse.json({ error: 'User not authenticated' }, { status: 401 })
}
userId = session.user.id
}
// Get the credential from the database
const credentials = await db
.select()
.from(account)
.where(and(eq(account.id, credentialId), eq(account.userId, session.user.id)))
.where(and(eq(account.id, credentialId), eq(account.userId, userId)))
.limit(1)
if (!credentials.length) {

View File

@@ -277,7 +277,18 @@ export async function GET(req: NextRequest) {
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars)
// Add workflowId to the input for OAuth credential resolution
const input = {
workflowId: schedule.workflowId,
}
const executor = new Executor(
serializedWorkflow,
currentBlockStates,
decryptedEnvVars,
input
)
const result = await executor.execute(schedule.workflowId)
// Log each execution step and the final result

View File

@@ -269,7 +269,18 @@ export async function POST(
// Serialize and execute the workflow
const serializedWorkflow = new Serializer().serializeWorkflow(mergedStates as any, edges, loops)
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars, input)
// Add workflowId to the input for OAuth credential resolution
const enrichedInput = {
...input,
workflowId: foundWorkflow.id,
}
const executor = new Executor(
serializedWorkflow,
currentBlockStates,
decryptedEnvVars,
enrichedInput
)
const result = await executor.execute(foundWorkflow.id)
console.log(`Successfully executed workflow ${foundWorkflow.id}`)

View File

@@ -470,10 +470,19 @@ async function handleProxyRequest(
// If we have a credential parameter, fetch the access token
if (params.credential) {
try {
// Determine if we're in a browser or server context
const isServerSide = typeof window === 'undefined'
// Prepare the token payload - include workflowId for server-side context
const tokenPayload =
isServerSide && params.workflowId
? { credentialId: params.credential, workflowId: params.workflowId }
: { credentialId: params.credential }
const response = await fetch(`${baseUrl}/api/auth/oauth/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ credentialId: params.credential }),
body: JSON.stringify(tokenPayload),
})
if (!response.ok) {
@@ -482,7 +491,12 @@ async function handleProxyRequest(
const data = await response.json()
params.accessToken = data.accessToken
// Clean up params we don't need to pass to the actual tool
delete params.credential
if (isServerSide && params.workflowId) {
delete params.workflowId
}
} catch (error) {
console.error('Error fetching access token:', error)
throw error