mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
feat(attachments): use filesystem for gmail, outlook triggers to save attachments (#1631)
* feat(outlook): add include attachment feature to outlook * add include attachments to gmail trigger * add gmail trigger, outlook block include attachments * fix rendering issue * remove comment * fix architecture * fix redeploy * pass files to logging session to surface in logs * fix gmail block parsing attachments * fix reads
This commit is contained in:
committed by
GitHub
parent
d325fdde6c
commit
b296323716
@@ -1,8 +1,7 @@
|
||||
import { apiKey, db, workflow, workflowDeploymentVersion } from '@sim/db'
|
||||
import { and, desc, eq, sql } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { generateApiKey } from '@/lib/api-key/service'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { generateRequestId } from '@/lib/utils'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
|
||||
@@ -64,26 +63,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
|
||||
.orderBy(desc(apiKey.lastUsed), desc(apiKey.createdAt))
|
||||
.limit(1)
|
||||
|
||||
if (userApiKey.length === 0) {
|
||||
try {
|
||||
const newApiKeyVal = generateApiKey()
|
||||
const keyName = 'Default API Key'
|
||||
await db.insert(apiKey).values({
|
||||
id: uuidv4(),
|
||||
userId: workflowData.userId,
|
||||
workspaceId: null,
|
||||
name: keyName,
|
||||
key: newApiKeyVal,
|
||||
type: 'personal',
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
keyInfo = { name: keyName, type: 'personal' }
|
||||
logger.info(`[${requestId}] Generated new API key for user: ${workflowData.userId}`)
|
||||
} catch (keyError) {
|
||||
logger.error(`[${requestId}] Failed to generate API key:`, keyError)
|
||||
}
|
||||
} else {
|
||||
if (userApiKey.length > 0) {
|
||||
keyInfo = { name: userApiKey[0].name, type: userApiKey[0].type as 'personal' | 'workspace' }
|
||||
}
|
||||
}
|
||||
@@ -190,34 +170,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
const deployedAt = new Date()
|
||||
logger.debug(`[${requestId}] Proceeding with deployment at ${deployedAt.toISOString()}`)
|
||||
|
||||
const userApiKey = await db
|
||||
.select({
|
||||
key: apiKey.key,
|
||||
})
|
||||
.from(apiKey)
|
||||
.where(and(eq(apiKey.userId, userId), eq(apiKey.type, 'personal')))
|
||||
.orderBy(desc(apiKey.lastUsed), desc(apiKey.createdAt))
|
||||
.limit(1)
|
||||
|
||||
if (userApiKey.length === 0) {
|
||||
try {
|
||||
const newApiKey = generateApiKey()
|
||||
await db.insert(apiKey).values({
|
||||
id: uuidv4(),
|
||||
userId,
|
||||
workspaceId: null,
|
||||
name: 'Default API Key',
|
||||
key: newApiKey,
|
||||
type: 'personal',
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
logger.info(`[${requestId}] Generated new API key for user: ${userId}`)
|
||||
} catch (keyError) {
|
||||
logger.error(`[${requestId}] Failed to generate API key:`, keyError)
|
||||
}
|
||||
}
|
||||
|
||||
let keyInfo: { name: string; type: 'personal' | 'workspace' } | null = null
|
||||
let matchedKey: {
|
||||
id: string
|
||||
@@ -226,13 +178,50 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
type: 'personal' | 'workspace'
|
||||
} | null = null
|
||||
|
||||
if (providedApiKey) {
|
||||
let isValidKey = false
|
||||
// Use provided API key, or fall back to existing pinned API key for redeployment
|
||||
const apiKeyToUse = providedApiKey || workflowData!.pinnedApiKeyId
|
||||
|
||||
const currentUserId = session?.user?.id
|
||||
if (!apiKeyToUse) {
|
||||
return NextResponse.json(
|
||||
{ error: 'API key is required. Please create or select an API key before deploying.' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
if (currentUserId) {
|
||||
const [personalKey] = await db
|
||||
let isValidKey = false
|
||||
|
||||
const currentUserId = session?.user?.id
|
||||
|
||||
if (currentUserId) {
|
||||
const [personalKey] = await db
|
||||
.select({
|
||||
id: apiKey.id,
|
||||
key: apiKey.key,
|
||||
name: apiKey.name,
|
||||
expiresAt: apiKey.expiresAt,
|
||||
})
|
||||
.from(apiKey)
|
||||
.where(
|
||||
and(
|
||||
eq(apiKey.id, apiKeyToUse),
|
||||
eq(apiKey.userId, currentUserId),
|
||||
eq(apiKey.type, 'personal')
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (personalKey) {
|
||||
if (!personalKey.expiresAt || personalKey.expiresAt >= new Date()) {
|
||||
matchedKey = { ...personalKey, type: 'personal' }
|
||||
isValidKey = true
|
||||
keyInfo = { name: personalKey.name, type: 'personal' }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValidKey) {
|
||||
if (workflowData!.workspaceId) {
|
||||
const [workspaceKey] = await db
|
||||
.select({
|
||||
id: apiKey.id,
|
||||
key: apiKey.key,
|
||||
@@ -242,55 +231,26 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
.from(apiKey)
|
||||
.where(
|
||||
and(
|
||||
eq(apiKey.id, providedApiKey),
|
||||
eq(apiKey.userId, currentUserId),
|
||||
eq(apiKey.type, 'personal')
|
||||
eq(apiKey.id, apiKeyToUse),
|
||||
eq(apiKey.workspaceId, workflowData!.workspaceId),
|
||||
eq(apiKey.type, 'workspace')
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (personalKey) {
|
||||
if (!personalKey.expiresAt || personalKey.expiresAt >= new Date()) {
|
||||
matchedKey = { ...personalKey, type: 'personal' }
|
||||
if (workspaceKey) {
|
||||
if (!workspaceKey.expiresAt || workspaceKey.expiresAt >= new Date()) {
|
||||
matchedKey = { ...workspaceKey, type: 'workspace' }
|
||||
isValidKey = true
|
||||
keyInfo = { name: personalKey.name, type: 'personal' }
|
||||
keyInfo = { name: workspaceKey.name, type: 'workspace' }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValidKey) {
|
||||
if (workflowData!.workspaceId) {
|
||||
const [workspaceKey] = await db
|
||||
.select({
|
||||
id: apiKey.id,
|
||||
key: apiKey.key,
|
||||
name: apiKey.name,
|
||||
expiresAt: apiKey.expiresAt,
|
||||
})
|
||||
.from(apiKey)
|
||||
.where(
|
||||
and(
|
||||
eq(apiKey.id, providedApiKey),
|
||||
eq(apiKey.workspaceId, workflowData!.workspaceId),
|
||||
eq(apiKey.type, 'workspace')
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (workspaceKey) {
|
||||
if (!workspaceKey.expiresAt || workspaceKey.expiresAt >= new Date()) {
|
||||
matchedKey = { ...workspaceKey, type: 'workspace' }
|
||||
isValidKey = true
|
||||
keyInfo = { name: workspaceKey.name, type: 'workspace' }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!isValidKey) {
|
||||
logger.warn(`[${requestId}] Invalid API key ID provided for workflow deployment: ${id}`)
|
||||
return createErrorResponse('Invalid API key provided', 400)
|
||||
}
|
||||
if (!isValidKey) {
|
||||
logger.warn(`[${requestId}] Invalid API key ID provided for workflow deployment: ${id}`)
|
||||
return createErrorResponse('Invalid API key provided', 400)
|
||||
}
|
||||
|
||||
// Attribution: this route is UI-only; require session user as actor
|
||||
|
||||
@@ -361,6 +361,8 @@ export function DeployModal({
|
||||
await fetchVersions()
|
||||
} catch (error: unknown) {
|
||||
logger.error('Error deploying workflow:', { error })
|
||||
const errorMessage = error instanceof Error ? error.message : 'Failed to deploy workflow'
|
||||
setApiDeployError(errorMessage)
|
||||
} finally {
|
||||
setIsSubmitting(false)
|
||||
}
|
||||
|
||||
@@ -432,6 +432,17 @@ export const WorkflowBlock = memo(
|
||||
}
|
||||
}, [id, blockHeight, blockWidth, updateBlockLayoutMetrics, updateNodeInternals, debounce])
|
||||
|
||||
// Subscribe to this block's subblock values to track changes for conditional rendering
|
||||
const blockSubBlockValues = useSubBlockStore(
|
||||
useCallback(
|
||||
(state) => {
|
||||
if (!activeWorkflowId) return {}
|
||||
return state.workflowValues[activeWorkflowId]?.[id] || {}
|
||||
},
|
||||
[activeWorkflowId, id]
|
||||
)
|
||||
)
|
||||
|
||||
// Memoized SubBlock layout management - only recalculate when dependencies change
|
||||
const subBlockRows = useMemo(() => {
|
||||
const rows: SubBlockConfig[][] = []
|
||||
@@ -450,8 +461,7 @@ export const WorkflowBlock = memo(
|
||||
} else {
|
||||
// In normal mode, use merged state
|
||||
const blocks = useWorkflowStore.getState().blocks
|
||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId || undefined
|
||||
const mergedState = mergeSubblockState(blocks, activeWorkflowId, id)[id]
|
||||
const mergedState = mergeSubblockState(blocks, activeWorkflowId || undefined, id)[id]
|
||||
stateToUse = mergedState?.subBlocks || {}
|
||||
}
|
||||
|
||||
@@ -552,6 +562,8 @@ export const WorkflowBlock = memo(
|
||||
data.subBlockValues,
|
||||
currentWorkflow.isDiffMode,
|
||||
currentBlock,
|
||||
blockSubBlockValues,
|
||||
activeWorkflowId,
|
||||
])
|
||||
|
||||
// Name editing handlers
|
||||
|
||||
@@ -10,6 +10,7 @@ import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { decryptSecret } from '@/lib/utils'
|
||||
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
|
||||
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils'
|
||||
import {
|
||||
loadDeployedWorkflowState,
|
||||
@@ -20,9 +21,66 @@ import { Executor } from '@/executor'
|
||||
import type { ExecutionResult } from '@/executor/types'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
import { getTrigger } from '@/triggers'
|
||||
|
||||
const logger = createLogger('TriggerWebhookExecution')
|
||||
|
||||
/**
|
||||
* Process trigger outputs based on their schema definitions
|
||||
* Finds outputs marked as 'file' or 'file[]' and uploads them to execution storage
|
||||
*/
|
||||
async function processTriggerFileOutputs(
|
||||
input: any,
|
||||
triggerOutputs: Record<string, any>,
|
||||
context: {
|
||||
workspaceId: string
|
||||
workflowId: string
|
||||
executionId: string
|
||||
requestId: string
|
||||
},
|
||||
path = ''
|
||||
): Promise<any> {
|
||||
if (!input || typeof input !== 'object') {
|
||||
return input
|
||||
}
|
||||
|
||||
const processed: any = Array.isArray(input) ? [] : {}
|
||||
|
||||
for (const [key, value] of Object.entries(input)) {
|
||||
const currentPath = path ? `${path}.${key}` : key
|
||||
const outputDef = triggerOutputs[key]
|
||||
const val: any = value
|
||||
|
||||
// If this field is marked as file or file[], process it
|
||||
if (outputDef?.type === 'file[]' && Array.isArray(val)) {
|
||||
try {
|
||||
processed[key] = await WebhookAttachmentProcessor.processAttachments(val as any, context)
|
||||
} catch (error) {
|
||||
processed[key] = []
|
||||
}
|
||||
} else if (outputDef?.type === 'file' && val) {
|
||||
try {
|
||||
const [processedFile] = await WebhookAttachmentProcessor.processAttachments(
|
||||
[val as any],
|
||||
context
|
||||
)
|
||||
processed[key] = processedFile
|
||||
} catch (error) {
|
||||
logger.error(`[${context.requestId}] Error processing ${currentPath}:`, error)
|
||||
processed[key] = val
|
||||
}
|
||||
} else if (outputDef && typeof outputDef === 'object' && !outputDef.type) {
|
||||
// Nested object in schema - recurse with the nested schema
|
||||
processed[key] = await processTriggerFileOutputs(val, outputDef, context, currentPath)
|
||||
} else {
|
||||
// Not a file output - keep as is
|
||||
processed[key] = val
|
||||
}
|
||||
}
|
||||
|
||||
return processed
|
||||
}
|
||||
|
||||
export type WebhookExecutionPayload = {
|
||||
webhookId: string
|
||||
workflowId: string
|
||||
@@ -250,6 +308,7 @@ async function executeWebhookJobInternal(
|
||||
totalDurationMs: totalDuration || 0,
|
||||
finalOutput: executionResult.output || {},
|
||||
traceSpans: traceSpans as any,
|
||||
workflowInput: airtableInput,
|
||||
})
|
||||
|
||||
return {
|
||||
@@ -312,6 +371,32 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
}
|
||||
|
||||
// Process trigger file outputs based on schema
|
||||
if (input && payload.blockId && blocks[payload.blockId]) {
|
||||
try {
|
||||
const triggerBlock = blocks[payload.blockId]
|
||||
const triggerId = triggerBlock?.subBlocks?.triggerId?.value
|
||||
|
||||
if (triggerId && typeof triggerId === 'string') {
|
||||
const triggerConfig = getTrigger(triggerId)
|
||||
|
||||
if (triggerConfig?.outputs) {
|
||||
logger.debug(`[${requestId}] Processing trigger ${triggerId} file outputs`)
|
||||
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
|
||||
workspaceId: workspaceId || '',
|
||||
workflowId: payload.workflowId,
|
||||
executionId,
|
||||
requestId,
|
||||
})
|
||||
Object.assign(input, processedInput)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
|
||||
// Continue without processing attachments rather than failing execution
|
||||
}
|
||||
}
|
||||
|
||||
// Create executor and execute
|
||||
const executor = new Executor({
|
||||
workflow: serializedWorkflow,
|
||||
@@ -367,6 +452,7 @@ async function executeWebhookJobInternal(
|
||||
totalDurationMs: totalDuration || 0,
|
||||
finalOutput: executionResult.output || {},
|
||||
traceSpans: traceSpans as any,
|
||||
workflowInput: input,
|
||||
})
|
||||
|
||||
return {
|
||||
|
||||
@@ -173,6 +173,13 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
|
||||
placeholder: 'Number of emails to retrieve (default: 1, max: 10)',
|
||||
condition: { field: 'operation', value: 'read_outlook' },
|
||||
},
|
||||
{
|
||||
id: 'includeAttachments',
|
||||
title: 'Include Attachments',
|
||||
type: 'switch',
|
||||
layout: 'full',
|
||||
condition: { field: 'operation', value: 'read_outlook' },
|
||||
},
|
||||
// TRIGGER MODE: Trigger configuration (only shown when trigger mode is active)
|
||||
{
|
||||
id: 'triggerConfig',
|
||||
@@ -231,6 +238,7 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
|
||||
folder: { type: 'string', description: 'Email folder' },
|
||||
manualFolder: { type: 'string', description: 'Manual folder name' },
|
||||
maxResults: { type: 'number', description: 'Maximum emails' },
|
||||
includeAttachments: { type: 'boolean', description: 'Include email attachments' },
|
||||
},
|
||||
outputs: {
|
||||
// Common outputs
|
||||
@@ -255,6 +263,10 @@ export const OutlookBlock: BlockConfig<OutlookResponse> = {
|
||||
receivedDateTime: { type: 'string', description: 'Email received timestamp' },
|
||||
sentDateTime: { type: 'string', description: 'Email sent timestamp' },
|
||||
hasAttachments: { type: 'boolean', description: 'Whether email has attachments' },
|
||||
attachments: {
|
||||
type: 'json',
|
||||
description: 'Email attachments (if includeAttachments is enabled)',
|
||||
},
|
||||
isRead: { type: 'boolean', description: 'Whether email is read' },
|
||||
importance: { type: 'string', description: 'Email importance level' },
|
||||
// Trigger outputs
|
||||
|
||||
107
apps/sim/lib/webhooks/attachment-processor.ts
Normal file
107
apps/sim/lib/webhooks/attachment-processor.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { uploadExecutionFile } from '@/lib/workflows/execution-file-storage'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
|
||||
const logger = createLogger('WebhookAttachmentProcessor')
|
||||
|
||||
export interface WebhookAttachment {
|
||||
name: string
|
||||
data: Buffer
|
||||
contentType: string
|
||||
size: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes webhook/trigger attachments and converts them to UserFile objects.
|
||||
* This enables triggers to include file attachments that get automatically stored
|
||||
* in the execution filesystem and made available as UserFile objects for workflow use.
|
||||
*/
|
||||
export class WebhookAttachmentProcessor {
|
||||
/**
|
||||
* Process attachments and upload them to execution storage
|
||||
*/
|
||||
static async processAttachments(
|
||||
attachments: WebhookAttachment[],
|
||||
executionContext: {
|
||||
workspaceId: string
|
||||
workflowId: string
|
||||
executionId: string
|
||||
requestId: string
|
||||
}
|
||||
): Promise<UserFile[]> {
|
||||
if (!attachments || attachments.length === 0) {
|
||||
return []
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${executionContext.requestId}] Processing ${attachments.length} attachments for execution ${executionContext.executionId}`
|
||||
)
|
||||
|
||||
const processedFiles: UserFile[] = []
|
||||
|
||||
for (const attachment of attachments) {
|
||||
try {
|
||||
const userFile = await WebhookAttachmentProcessor.processAttachment(
|
||||
attachment,
|
||||
executionContext
|
||||
)
|
||||
processedFiles.push(userFile)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${executionContext.requestId}] Error processing attachment '${attachment.name}':`,
|
||||
error
|
||||
)
|
||||
// Continue with other attachments rather than failing the entire request
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${executionContext.requestId}] Successfully processed ${processedFiles.length}/${attachments.length} attachments`
|
||||
)
|
||||
|
||||
return processedFiles
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a single attachment and upload to execution storage
|
||||
*/
|
||||
private static async processAttachment(
|
||||
attachment: WebhookAttachment,
|
||||
executionContext: {
|
||||
workspaceId: string
|
||||
workflowId: string
|
||||
executionId: string
|
||||
requestId: string
|
||||
}
|
||||
): Promise<UserFile> {
|
||||
const data = attachment.data as any
|
||||
|
||||
if (!data || typeof data !== 'object' || data.type !== 'Buffer' || !Array.isArray(data.data)) {
|
||||
throw new Error(`Attachment '${attachment.name}' data must be a serialized Buffer`)
|
||||
}
|
||||
|
||||
const buffer = Buffer.from(data.data)
|
||||
|
||||
if (buffer.length === 0) {
|
||||
throw new Error(`Attachment '${attachment.name}' has zero bytes`)
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${executionContext.requestId}] Uploading attachment '${attachment.name}' (${attachment.size} bytes, ${attachment.contentType})`
|
||||
)
|
||||
|
||||
// Upload to execution storage
|
||||
const userFile = await uploadExecutionFile(
|
||||
executionContext,
|
||||
buffer,
|
||||
attachment.name,
|
||||
attachment.contentType
|
||||
)
|
||||
|
||||
logger.info(
|
||||
`[${executionContext.requestId}] Successfully stored attachment '${attachment.name}' with key: ${userFile.key}`
|
||||
)
|
||||
|
||||
return userFile
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,8 @@ import { pollingIdempotency } from '@/lib/idempotency/service'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getBaseUrl } from '@/lib/urls/utils'
|
||||
import { getOAuthToken, refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import type { GmailAttachment } from '@/tools/gmail/types'
|
||||
import { downloadAttachments, extractAttachmentInfo } from '@/tools/gmail/utils'
|
||||
|
||||
const logger = createLogger('GmailPollingService')
|
||||
|
||||
@@ -17,6 +19,7 @@ interface GmailWebhookConfig {
|
||||
lastCheckedTimestamp?: string
|
||||
historyId?: string
|
||||
pollingInterval?: number
|
||||
includeAttachments?: boolean
|
||||
includeRawEmail?: boolean
|
||||
}
|
||||
|
||||
@@ -42,7 +45,7 @@ export interface SimplifiedEmail {
|
||||
bodyHtml: string
|
||||
labels: string[]
|
||||
hasAttachments: boolean
|
||||
attachments: Array<{ filename: string; mimeType: string; size: number }>
|
||||
attachments: GmailAttachment[]
|
||||
}
|
||||
|
||||
export interface GmailWebhookPayload {
|
||||
@@ -530,30 +533,23 @@ async function processEmails(
|
||||
date = new Date(Number.parseInt(email.internalDate)).toISOString()
|
||||
}
|
||||
|
||||
// Extract attachment information if present
|
||||
const attachments: Array<{ filename: string; mimeType: string; size: number }> = []
|
||||
// Download attachments if requested (raw Buffers - will be uploaded during execution)
|
||||
let attachments: GmailAttachment[] = []
|
||||
const hasAttachments = email.payload
|
||||
? extractAttachmentInfo(email.payload).length > 0
|
||||
: false
|
||||
|
||||
const findAttachments = (part: any) => {
|
||||
if (!part) return
|
||||
|
||||
if (part.filename && part.filename.length > 0) {
|
||||
attachments.push({
|
||||
filename: part.filename,
|
||||
mimeType: part.mimeType || 'application/octet-stream',
|
||||
size: part.body?.size || 0,
|
||||
})
|
||||
if (config.includeAttachments && hasAttachments && email.payload) {
|
||||
try {
|
||||
const attachmentInfo = extractAttachmentInfo(email.payload)
|
||||
attachments = await downloadAttachments(email.id, attachmentInfo, accessToken)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${requestId}] Error downloading attachments for email ${email.id}:`,
|
||||
error
|
||||
)
|
||||
// Continue without attachments rather than failing the entire request
|
||||
}
|
||||
|
||||
// Look for attachments in nested parts
|
||||
if (part.parts && Array.isArray(part.parts)) {
|
||||
for (const subPart of part.parts) {
|
||||
findAttachments(subPart)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (email.payload) {
|
||||
findAttachments(email.payload)
|
||||
}
|
||||
|
||||
// Create simplified email object
|
||||
@@ -568,8 +564,8 @@ async function processEmails(
|
||||
bodyText: textContent,
|
||||
bodyHtml: htmlContent,
|
||||
labels: email.labelIds || [],
|
||||
hasAttachments: attachments.length > 0,
|
||||
attachments: attachments,
|
||||
hasAttachments,
|
||||
attachments,
|
||||
}
|
||||
|
||||
// Prepare webhook payload with simplified email and optionally raw email
|
||||
|
||||
@@ -18,6 +18,7 @@ interface OutlookWebhookConfig {
|
||||
maxEmailsPerPoll?: number
|
||||
lastCheckedTimestamp?: string
|
||||
pollingInterval?: number
|
||||
includeAttachments?: boolean
|
||||
includeRawEmail?: boolean
|
||||
}
|
||||
|
||||
@@ -55,6 +56,13 @@ interface OutlookEmail {
|
||||
parentFolderId: string
|
||||
}
|
||||
|
||||
export interface OutlookAttachment {
|
||||
name: string
|
||||
data: Buffer
|
||||
contentType: string
|
||||
size: number
|
||||
}
|
||||
|
||||
export interface SimplifiedOutlookEmail {
|
||||
id: string
|
||||
conversationId: string
|
||||
@@ -66,6 +74,7 @@ export interface SimplifiedOutlookEmail {
|
||||
bodyText: string
|
||||
bodyHtml: string
|
||||
hasAttachments: boolean
|
||||
attachments: OutlookAttachment[]
|
||||
isRead: boolean
|
||||
folderId: string
|
||||
// Thread support fields
|
||||
@@ -343,6 +352,18 @@ async function processOutlookEmails(
|
||||
'outlook',
|
||||
`${webhookData.id}:${email.id}`,
|
||||
async () => {
|
||||
let attachments: OutlookAttachment[] = []
|
||||
if (config.includeAttachments && email.hasAttachments) {
|
||||
try {
|
||||
attachments = await downloadOutlookAttachments(accessToken, email.id, requestId)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${requestId}] Error downloading attachments for email ${email.id}:`,
|
||||
error
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to simplified format
|
||||
const simplifiedEmail: SimplifiedOutlookEmail = {
|
||||
id: email.id,
|
||||
@@ -365,6 +386,7 @@ async function processOutlookEmails(
|
||||
})(),
|
||||
bodyHtml: email.body?.content || '',
|
||||
hasAttachments: email.hasAttachments,
|
||||
attachments,
|
||||
isRead: email.isRead,
|
||||
folderId: email.parentFolderId,
|
||||
// Thread support fields
|
||||
@@ -435,6 +457,68 @@ async function processOutlookEmails(
|
||||
return processedCount
|
||||
}
|
||||
|
||||
async function downloadOutlookAttachments(
|
||||
accessToken: string,
|
||||
messageId: string,
|
||||
requestId: string
|
||||
): Promise<OutlookAttachment[]> {
|
||||
const attachments: OutlookAttachment[] = []
|
||||
|
||||
try {
|
||||
// Fetch attachments list from Microsoft Graph API
|
||||
const response = await fetch(
|
||||
`https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`,
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
logger.error(`[${requestId}] Failed to fetch attachments for message ${messageId}`)
|
||||
return attachments
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const attachmentsList = data.value || []
|
||||
|
||||
for (const attachment of attachmentsList) {
|
||||
try {
|
||||
// Microsoft Graph returns attachment data directly in the list response for file attachments
|
||||
if (attachment['@odata.type'] === '#microsoft.graph.fileAttachment') {
|
||||
const contentBytes = attachment.contentBytes
|
||||
if (contentBytes) {
|
||||
// contentBytes is base64 encoded
|
||||
const buffer = Buffer.from(contentBytes, 'base64')
|
||||
attachments.push({
|
||||
name: attachment.name,
|
||||
data: buffer,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.size,
|
||||
})
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${requestId}] Error processing attachment ${attachment.id} for message ${messageId}:`,
|
||||
error
|
||||
)
|
||||
// Continue with other attachments
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Downloaded ${attachments.length} attachments for message ${messageId}`
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error downloading attachments for message ${messageId}:`, error)
|
||||
}
|
||||
|
||||
return attachments
|
||||
}
|
||||
|
||||
async function markOutlookEmailAsRead(accessToken: string, messageId: string) {
|
||||
try {
|
||||
const response = await fetch(`https://graph.microsoft.com/v1.0/me/messages/${messageId}`, {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { GmailReadParams, GmailToolResponse } from '@/tools/gmail/types'
|
||||
import type { GmailAttachment, GmailReadParams, GmailToolResponse } from '@/tools/gmail/types'
|
||||
import {
|
||||
createMessagesSummary,
|
||||
GMAIL_API_BASE,
|
||||
@@ -195,15 +195,32 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
|
||||
|
||||
const messages = await Promise.all(messagePromises)
|
||||
|
||||
// Process all messages and create a summary
|
||||
const processedMessages = messages.map(processMessageForSummary)
|
||||
// Create summary from processed messages first
|
||||
const summaryMessages = messages.map(processMessageForSummary)
|
||||
|
||||
const allAttachments: GmailAttachment[] = []
|
||||
if (params?.includeAttachments) {
|
||||
for (const msg of messages) {
|
||||
try {
|
||||
const processedResult = await processMessage(msg, params)
|
||||
if (
|
||||
processedResult.output.attachments &&
|
||||
processedResult.output.attachments.length > 0
|
||||
) {
|
||||
allAttachments.push(...processedResult.output.attachments)
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error(`Error processing message ${msg.id} for attachments:`, error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
content: createMessagesSummary(processedMessages),
|
||||
content: createMessagesSummary(summaryMessages),
|
||||
metadata: {
|
||||
results: processedMessages.map((msg) => ({
|
||||
results: summaryMessages.map((msg) => ({
|
||||
id: msg.id,
|
||||
threadId: msg.threadId,
|
||||
subject: msg.subject,
|
||||
@@ -211,6 +228,7 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
|
||||
date: msg.date,
|
||||
})),
|
||||
},
|
||||
attachments: allAttachments,
|
||||
},
|
||||
}
|
||||
} catch (error: any) {
|
||||
@@ -224,6 +242,7 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
|
||||
threadId: msg.threadId,
|
||||
})),
|
||||
},
|
||||
attachments: [],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type {
|
||||
CleanedOutlookMessage,
|
||||
OutlookAttachment,
|
||||
OutlookMessage,
|
||||
OutlookMessagesResponse,
|
||||
OutlookReadParams,
|
||||
@@ -7,6 +8,61 @@ import type {
|
||||
} from '@/tools/outlook/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
/**
|
||||
* Download attachments from an Outlook message
|
||||
*/
|
||||
async function downloadAttachments(
|
||||
messageId: string,
|
||||
accessToken: string
|
||||
): Promise<OutlookAttachment[]> {
|
||||
const attachments: OutlookAttachment[] = []
|
||||
|
||||
try {
|
||||
// Fetch attachments list from Microsoft Graph API
|
||||
const response = await fetch(
|
||||
`https://graph.microsoft.com/v1.0/me/messages/${messageId}/attachments`,
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
return attachments
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const attachmentsList = data.value || []
|
||||
|
||||
for (const attachment of attachmentsList) {
|
||||
try {
|
||||
// Microsoft Graph returns attachment data directly in the list response for file attachments
|
||||
if (attachment['@odata.type'] === '#microsoft.graph.fileAttachment') {
|
||||
const contentBytes = attachment.contentBytes
|
||||
if (contentBytes) {
|
||||
// contentBytes is base64 encoded
|
||||
const buffer = Buffer.from(contentBytes, 'base64')
|
||||
attachments.push({
|
||||
name: attachment.name,
|
||||
data: buffer,
|
||||
contentType: attachment.contentType,
|
||||
size: attachment.size,
|
||||
})
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Continue with other attachments
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
// Return empty array on error
|
||||
}
|
||||
|
||||
return attachments
|
||||
}
|
||||
|
||||
export const outlookReadTool: ToolConfig<OutlookReadParams, OutlookReadResponse> = {
|
||||
id: 'outlook_read',
|
||||
name: 'Outlook Read',
|
||||
@@ -37,6 +93,12 @@ export const outlookReadTool: ToolConfig<OutlookReadParams, OutlookReadResponse>
|
||||
visibility: 'user-only',
|
||||
description: 'Maximum number of emails to retrieve (default: 1, max: 10)',
|
||||
},
|
||||
includeAttachments: {
|
||||
type: 'boolean',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Download and include email attachments',
|
||||
},
|
||||
},
|
||||
|
||||
request: {
|
||||
@@ -67,7 +129,7 @@ export const outlookReadTool: ToolConfig<OutlookReadParams, OutlookReadResponse>
|
||||
},
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
transformResponse: async (response: Response, params?: OutlookReadParams) => {
|
||||
const data: OutlookMessagesResponse = await response.json()
|
||||
|
||||
// Microsoft Graph API returns messages in a 'value' array
|
||||
@@ -84,44 +146,68 @@ export const outlookReadTool: ToolConfig<OutlookReadParams, OutlookReadResponse>
|
||||
}
|
||||
|
||||
// Clean up the message data to only include essential fields
|
||||
const cleanedMessages: CleanedOutlookMessage[] = messages.map((message: OutlookMessage) => ({
|
||||
id: message.id,
|
||||
subject: message.subject,
|
||||
bodyPreview: message.bodyPreview,
|
||||
body: {
|
||||
contentType: message.body?.contentType,
|
||||
content: message.body?.content,
|
||||
},
|
||||
sender: {
|
||||
name: message.sender?.emailAddress?.name,
|
||||
address: message.sender?.emailAddress?.address,
|
||||
},
|
||||
from: {
|
||||
name: message.from?.emailAddress?.name,
|
||||
address: message.from?.emailAddress?.address,
|
||||
},
|
||||
toRecipients:
|
||||
message.toRecipients?.map((recipient) => ({
|
||||
name: recipient.emailAddress?.name,
|
||||
address: recipient.emailAddress?.address,
|
||||
})) || [],
|
||||
ccRecipients:
|
||||
message.ccRecipients?.map((recipient) => ({
|
||||
name: recipient.emailAddress?.name,
|
||||
address: recipient.emailAddress?.address,
|
||||
})) || [],
|
||||
receivedDateTime: message.receivedDateTime,
|
||||
sentDateTime: message.sentDateTime,
|
||||
hasAttachments: message.hasAttachments,
|
||||
isRead: message.isRead,
|
||||
importance: message.importance,
|
||||
}))
|
||||
const cleanedMessages: CleanedOutlookMessage[] = await Promise.all(
|
||||
messages.map(async (message: OutlookMessage) => {
|
||||
// Download attachments if requested
|
||||
let attachments: OutlookAttachment[] | undefined
|
||||
if (params?.includeAttachments && message.hasAttachments && params?.accessToken) {
|
||||
try {
|
||||
attachments = await downloadAttachments(message.id, params.accessToken)
|
||||
} catch (error) {
|
||||
// Continue without attachments rather than failing the entire request
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: message.id,
|
||||
subject: message.subject,
|
||||
bodyPreview: message.bodyPreview,
|
||||
body: {
|
||||
contentType: message.body?.contentType,
|
||||
content: message.body?.content,
|
||||
},
|
||||
sender: {
|
||||
name: message.sender?.emailAddress?.name,
|
||||
address: message.sender?.emailAddress?.address,
|
||||
},
|
||||
from: {
|
||||
name: message.from?.emailAddress?.name,
|
||||
address: message.from?.emailAddress?.address,
|
||||
},
|
||||
toRecipients:
|
||||
message.toRecipients?.map((recipient) => ({
|
||||
name: recipient.emailAddress?.name,
|
||||
address: recipient.emailAddress?.address,
|
||||
})) || [],
|
||||
ccRecipients:
|
||||
message.ccRecipients?.map((recipient) => ({
|
||||
name: recipient.emailAddress?.name,
|
||||
address: recipient.emailAddress?.address,
|
||||
})) || [],
|
||||
receivedDateTime: message.receivedDateTime,
|
||||
sentDateTime: message.sentDateTime,
|
||||
hasAttachments: message.hasAttachments,
|
||||
attachments: attachments || [],
|
||||
isRead: message.isRead,
|
||||
importance: message.importance,
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
// Flatten all attachments from all emails to top level for FileToolProcessor
|
||||
const allAttachments: OutlookAttachment[] = []
|
||||
for (const email of cleanedMessages) {
|
||||
if (email.attachments && email.attachments.length > 0) {
|
||||
allAttachments.push(...email.attachments)
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
message: `Successfully read ${cleanedMessages.length} email(s).`,
|
||||
results: cleanedMessages,
|
||||
attachments: allAttachments,
|
||||
},
|
||||
}
|
||||
},
|
||||
@@ -129,5 +215,6 @@ export const outlookReadTool: ToolConfig<OutlookReadParams, OutlookReadResponse>
|
||||
outputs: {
|
||||
message: { type: 'string', description: 'Success or status message' },
|
||||
results: { type: 'array', description: 'Array of email message objects' },
|
||||
attachments: { type: 'file[]', description: 'All email attachments flattened from all emails' },
|
||||
},
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ export interface OutlookReadParams {
|
||||
folder: string
|
||||
maxResults: number
|
||||
messageId?: string
|
||||
includeAttachments?: boolean
|
||||
}
|
||||
|
||||
export interface OutlookReadResponse extends ToolResponse {
|
||||
@@ -103,6 +104,14 @@ export interface OutlookMessagesResponse {
|
||||
value: OutlookMessage[]
|
||||
}
|
||||
|
||||
// Outlook attachment interface (for tool responses)
|
||||
export interface OutlookAttachment {
|
||||
name: string
|
||||
data: Buffer
|
||||
contentType: string
|
||||
size: number
|
||||
}
|
||||
|
||||
// Cleaned message interface for our response
|
||||
export interface CleanedOutlookMessage {
|
||||
id: string
|
||||
@@ -131,6 +140,7 @@ export interface CleanedOutlookMessage {
|
||||
receivedDateTime?: string
|
||||
sentDateTime?: string
|
||||
hasAttachments?: boolean
|
||||
attachments?: OutlookAttachment[] | any[]
|
||||
isRead?: boolean
|
||||
importance?: string
|
||||
}
|
||||
|
||||
@@ -38,6 +38,13 @@ export const gmailPollingTrigger: TriggerConfig = {
|
||||
description: 'Automatically mark emails as read after processing',
|
||||
required: false,
|
||||
},
|
||||
includeAttachments: {
|
||||
type: 'boolean',
|
||||
label: 'Include Attachments',
|
||||
defaultValue: false,
|
||||
description: 'Download and include email attachments in the trigger payload',
|
||||
required: false,
|
||||
},
|
||||
includeRawEmail: {
|
||||
type: 'boolean',
|
||||
label: 'Include Raw Email Data',
|
||||
@@ -94,8 +101,8 @@ export const gmailPollingTrigger: TriggerConfig = {
|
||||
description: 'Whether email has attachments',
|
||||
},
|
||||
attachments: {
|
||||
type: 'json',
|
||||
description: 'Array of attachment information',
|
||||
type: 'file[]',
|
||||
description: 'Array of email attachments as files (if includeAttachments is enabled)',
|
||||
},
|
||||
},
|
||||
timestamp: {
|
||||
@@ -129,13 +136,7 @@ export const gmailPollingTrigger: TriggerConfig = {
|
||||
'<div><p>Hello,</p><p>Please find attached the monthly report for April 2025.</p><p>Best regards,<br>Sender</p></div>',
|
||||
labels: ['INBOX', 'IMPORTANT'],
|
||||
hasAttachments: true,
|
||||
attachments: [
|
||||
{
|
||||
filename: 'report-april-2025.pdf',
|
||||
mimeType: 'application/pdf',
|
||||
size: 2048576,
|
||||
},
|
||||
],
|
||||
attachments: [],
|
||||
},
|
||||
timestamp: '2025-05-10T10:15:30.123Z',
|
||||
},
|
||||
|
||||
@@ -38,6 +38,13 @@ export const outlookPollingTrigger: TriggerConfig = {
|
||||
description: 'Automatically mark emails as read after processing',
|
||||
required: false,
|
||||
},
|
||||
includeAttachments: {
|
||||
type: 'boolean',
|
||||
label: 'Include Attachments',
|
||||
defaultValue: false,
|
||||
description: 'Download and include email attachments in the trigger payload',
|
||||
required: false,
|
||||
},
|
||||
includeRawEmail: {
|
||||
type: 'boolean',
|
||||
label: 'Include Raw Email Data',
|
||||
@@ -89,6 +96,10 @@ export const outlookPollingTrigger: TriggerConfig = {
|
||||
type: 'boolean',
|
||||
description: 'Whether email has attachments',
|
||||
},
|
||||
attachments: {
|
||||
type: 'file[]',
|
||||
description: 'Array of email attachments as files (if includeAttachments is enabled)',
|
||||
},
|
||||
isRead: {
|
||||
type: 'boolean',
|
||||
description: 'Whether email is read',
|
||||
@@ -136,6 +147,7 @@ export const outlookPollingTrigger: TriggerConfig = {
|
||||
bodyHtml:
|
||||
'<div><p>Hi Team,</p><p>Please find attached the Q1 2025 business review document. We need to discuss the results in our next meeting.</p><p>Best regards,<br>Manager</p></div>',
|
||||
hasAttachments: true,
|
||||
attachments: [],
|
||||
isRead: false,
|
||||
folderId: 'AQMkADg1OWUyZjg4LWJkNGYtNDFhYy04OGVjAC4AAAJzE3bU',
|
||||
messageId: 'AAMkADg1OWUyZjg4LWJkNGYtNDFhYy04OGVjLWVkM2VhY2YzYTcwZgBGAAAAAACE3bU',
|
||||
|
||||
Reference in New Issue
Block a user