diff --git a/apps/sim/app/api/logs/cleanup/route.ts b/apps/sim/app/api/logs/cleanup/route.ts index 058e316b4e..4c8a0c949f 100644 --- a/apps/sim/app/api/logs/cleanup/route.ts +++ b/apps/sim/app/api/logs/cleanup/route.ts @@ -10,13 +10,13 @@ export const dynamic = 'force-dynamic' const logger = createLogger('LogsCleanup') -const BATCH_SIZE = 500 +const BATCH_SIZE = 2000 const S3_CONFIG = { bucket: process.env.S3_LOGS_BUCKET_NAME || '', region: process.env.AWS_REGION || '', } -export async function POST(request: Request) { +export async function GET(request: Request) { try { const authHeader = request.headers.get('authorization') @@ -66,89 +66,103 @@ export async function POST(request: Request) { const workflowIds = workflowsQuery.map((w) => w.id) - const oldLogs = await db - .select({ - id: workflowLogs.id, - workflowId: workflowLogs.workflowId, - executionId: workflowLogs.executionId, - level: workflowLogs.level, - message: workflowLogs.message, - duration: workflowLogs.duration, - trigger: workflowLogs.trigger, - createdAt: workflowLogs.createdAt, - metadata: workflowLogs.metadata, - }) - .from(workflowLogs) - .where( - and( - inArray(workflowLogs.workflowId, workflowIds), - lt(workflowLogs.createdAt, retentionDate) - ) - ) - .limit(BATCH_SIZE) - - logger.info(`Found ${oldLogs.length} logs older than ${retentionDate.toISOString()} to archive`) - - if (oldLogs.length === 0) { - return NextResponse.json({ message: 'No logs to clean up' }) - } - const results = { - total: oldLogs.length, + total: 0, archived: 0, archiveFailed: 0, deleted: 0, deleteFailed: 0, } - for (const log of oldLogs) { - const today = new Date().toISOString().split('T')[0] + const startTime = Date.now() + const MAX_BATCHES = 10 - const logKey = `archived-logs/${today}/${log.id}.json` - const logData = JSON.stringify(log) + let batchesProcessed = 0 + let hasMoreLogs = true - try { - await s3Client.send( - new PutObjectCommand({ - Bucket: S3_CONFIG.bucket, - Key: logKey, - Body: logData, - ContentType: 'application/json', - Metadata: { - logId: String(log.id), - workflowId: String(log.workflowId), - archivedAt: new Date().toISOString(), - }, - }) + while (hasMoreLogs && batchesProcessed < MAX_BATCHES) { + const oldLogs = await db + .select({ + id: workflowLogs.id, + workflowId: workflowLogs.workflowId, + executionId: workflowLogs.executionId, + level: workflowLogs.level, + message: workflowLogs.message, + duration: workflowLogs.duration, + trigger: workflowLogs.trigger, + createdAt: workflowLogs.createdAt, + metadata: workflowLogs.metadata, + }) + .from(workflowLogs) + .where( + and( + inArray(workflowLogs.workflowId, workflowIds), + lt(workflowLogs.createdAt, retentionDate) + ) ) + .limit(BATCH_SIZE) - results.archived++ + results.total += oldLogs.length + + for (const log of oldLogs) { + const today = new Date().toISOString().split('T')[0] + + const logKey = `archived-logs/${today}/${log.id}.json` + const logData = JSON.stringify(log) try { - const deleteResult = await db - .delete(workflowLogs) - .where(eq(workflowLogs.id, log.id)) - .returning({ id: workflowLogs.id }) + await s3Client.send( + new PutObjectCommand({ + Bucket: S3_CONFIG.bucket, + Key: logKey, + Body: logData, + ContentType: 'application/json', + Metadata: { + logId: String(log.id), + workflowId: String(log.workflowId), + archivedAt: new Date().toISOString(), + }, + }) + ) - if (deleteResult.length > 0) { - results.deleted++ - } else { + results.archived++ + + try { + const deleteResult = await db + .delete(workflowLogs) + .where(eq(workflowLogs.id, log.id)) + .returning({ id: workflowLogs.id }) + + if (deleteResult.length > 0) { + results.deleted++ + } else { + results.deleteFailed++ + logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`) + } + } catch (deleteError) { results.deleteFailed++ - logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`) + logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError }) } - } catch (deleteError) { - results.deleteFailed++ - logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError }) + } catch (archiveError) { + results.archiveFailed++ + logger.error(`Failed to archive log ${log.id}:`, { archiveError }) } - } catch (archiveError) { - results.archiveFailed++ - logger.error(`Failed to archive log ${log.id}:`, { archiveError }) } + + batchesProcessed++ + hasMoreLogs = oldLogs.length === BATCH_SIZE + + logger.info(`Processed batch ${batchesProcessed}: ${oldLogs.length} logs`) } + const timeElapsed = (Date.now() - startTime) / 1000 + const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs + return NextResponse.json({ - message: `Successfully processed ${results.total} logs: archived ${results.archived}, deleted ${results.deleted}`, + message: `Processed ${batchesProcessed} batches (${results.total} logs) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`, results, + complete: !hasMoreLogs, + batchLimitReached: reachedLimit, }) } catch (error) { logger.error('Error in log cleanup process:', { error }) diff --git a/apps/sim/app/api/tools/stagehand/agent/route.ts b/apps/sim/app/api/tools/stagehand/agent/route.ts index 182a69b077..3df0ec6501 100644 --- a/apps/sim/app/api/tools/stagehand/agent/route.ts +++ b/apps/sim/app/api/tools/stagehand/agent/route.ts @@ -6,11 +6,9 @@ import { ensureZodObject, normalizeUrl } from '../utils' const logger = createLogger('StagehandAgentAPI') -// Environment variables for Browserbase const BROWSERBASE_API_KEY = process.env.BROWSERBASE_API_KEY const BROWSERBASE_PROJECT_ID = process.env.BROWSERBASE_PROJECT_ID -// Input validation schema const requestSchema = z.object({ task: z.string().min(1), startUrl: z.string().url(), @@ -19,16 +17,13 @@ const requestSchema = z.object({ apiKey: z.string(), }) -// Helper function to get the schema object from the input function getSchemaObject(outputSchema: Record): Record { - // Check if schema is nested under a 'schema' property (common pattern) if (outputSchema.schema && typeof outputSchema.schema === 'object') { return outputSchema.schema } return outputSchema } -// Helper function to format schema for instructions function formatSchemaForInstructions(schema: Record): string { try { return JSON.stringify(schema, null, 2) @@ -38,7 +33,6 @@ function formatSchemaForInstructions(schema: Record): string { } } -// Helper to extract special action directives with variable placeholders function extractActionDirectives(task: string): { processedTask: string actionDirectives: Array<{ index: number; action: string }> @@ -438,7 +432,6 @@ export async function POST(request: NextRequest) { let stagehand: Stagehand | null = null try { - // Parse and validate request body const body = await request.json() logger.info('Received Stagehand agent request', { startUrl: body.startUrl, diff --git a/apps/sim/app/api/tools/stagehand/extract/route.ts b/apps/sim/app/api/tools/stagehand/extract/route.ts index 810fa54415..0b5f432bd6 100644 --- a/apps/sim/app/api/tools/stagehand/extract/route.ts +++ b/apps/sim/app/api/tools/stagehand/extract/route.ts @@ -6,11 +6,9 @@ import { ensureZodObject, normalizeUrl } from '../utils' const logger = createLogger('StagehandExtractAPI') -// Environment variables for Browserbase const BROWSERBASE_API_KEY = process.env.BROWSERBASE_API_KEY const BROWSERBASE_PROJECT_ID = process.env.BROWSERBASE_PROJECT_ID -// Input validation schema const requestSchema = z.object({ instruction: z.string(), schema: z.record(z.any()), @@ -24,7 +22,6 @@ export async function POST(request: NextRequest) { let stagehand = null try { - // Parse and validate request body const body = await request.json() logger.info('Received extraction request', { url: body.url, @@ -54,7 +51,6 @@ export async function POST(request: NextRequest) { schemaType: typeof schema, }) - // Validate schema structure if (!schema || typeof schema !== 'object') { logger.error('Invalid schema format', { schema }) return NextResponse.json( @@ -63,7 +59,6 @@ export async function POST(request: NextRequest) { ) } - // Check for required environment variables if (!BROWSERBASE_API_KEY || !BROWSERBASE_PROJECT_ID) { logger.error('Missing required environment variables', { hasBrowserbaseApiKey: !!BROWSERBASE_API_KEY, @@ -76,53 +71,43 @@ export async function POST(request: NextRequest) { ) } - // Validate OpenAI API key format if (!apiKey || typeof apiKey !== 'string' || !apiKey.startsWith('sk-')) { logger.error('Invalid OpenAI API key format') return NextResponse.json({ error: 'Invalid OpenAI API key format' }, { status: 400 }) } try { - // Initialize Stagehand with Browserbase logger.info('Initializing Stagehand with Browserbase') stagehand = new Stagehand({ env: 'BROWSERBASE', apiKey: BROWSERBASE_API_KEY, projectId: BROWSERBASE_PROJECT_ID, verbose: 1, - // Use a custom logger wrapper that adapts our logger to Stagehand's expected format logger: (msg) => logger.info(typeof msg === 'string' ? msg : JSON.stringify(msg)), disablePino: true, modelName: 'gpt-4o', modelClientOptions: { - apiKey: apiKey, // User's OpenAI API key + apiKey: apiKey, }, }) - // Initialize Stagehand logger.info('Starting stagehand.init()') await stagehand.init() logger.info('Stagehand initialized successfully') - // Navigate to the specified URL logger.info(`Navigating to ${url}`) await stagehand.page.goto(url, { waitUntil: 'networkidle' }) logger.info('Navigation complete') - // Prepare for extraction logger.info('Preparing extraction schema', { schema: JSON.stringify(schema).substring(0, 100) + '...', }) - // Extract data using Stagehand with the raw JSON schema logger.info('Extracting data with Stagehand') try { - // Convert the JSON schema to a Zod schema - // First check if the schema has a nested "schema" property (common pattern) const schemaToConvert = schema.schema || schema - // Create a Zod schema from the JSON schema let zodSchema try { logger.info('Creating Zod schema from JSON schema', { @@ -130,7 +115,6 @@ export async function POST(request: NextRequest) { hasNestedSchema: !!schema.schema, }) - // Convert the schema to a Zod schema zodSchema = ensureZodObject(logger, schemaToConvert) logger.info('Successfully created Zod schema') @@ -140,29 +124,24 @@ export async function POST(request: NextRequest) { message: schemaError instanceof Error ? schemaError.message : 'Unknown schema error', }) - // Fall back to simple extraction without schema logger.info('Falling back to simple extraction without schema') zodSchema = undefined } - // Prepare extraction options const extractOptions: any = { instruction, useTextExtract: !!useTextExtract, } - // Add schema if we have one if (zodSchema) { extractOptions.schema = zodSchema } - // Add selector if provided if (selector) { logger.info(`Using selector: ${selector}`) extractOptions.selector = selector } - // Get the extracted data logger.info('Calling stagehand.page.extract with options', { hasInstruction: !!extractOptions.instruction, hasSchema: !!extractOptions.schema, @@ -170,13 +149,10 @@ export async function POST(request: NextRequest) { useTextExtract: extractOptions.useTextExtract, }) - // Call extract based on whether we have a schema or not let extractedData if (zodSchema) { - // Use the full options object when we have a schema extractedData = await stagehand.page.extract(extractOptions) } else { - // Just pass the instruction when we don't have a schema extractedData = await stagehand.page.extract(extractOptions.instruction) } @@ -186,7 +162,6 @@ export async function POST(request: NextRequest) { dataKeys: extractedData ? Object.keys(extractedData) : [], }) - // Return the extracted data return NextResponse.json({ data: extractedData, schema, @@ -206,7 +181,6 @@ export async function POST(request: NextRequest) { stack: error instanceof Error ? error.stack : undefined, }) - // Provide more detailed error information let errorMessage = 'Unknown error during extraction' let errorDetails: Record = {} @@ -217,7 +191,6 @@ export async function POST(request: NextRequest) { stack: error.stack, } - // Log any additional properties that might provide context const errorObj = error as any if (typeof errorObj.code !== 'undefined') { errorDetails.code = errorObj.code @@ -252,7 +225,6 @@ export async function POST(request: NextRequest) { { status: 500 } ) } finally { - // Make sure to clean up Stagehand resources if (stagehand) { try { logger.info('Closing Stagehand instance') diff --git a/apps/sim/next.config.ts b/apps/sim/next.config.ts index 75c5c39b0c..51b28e0f65 100644 --- a/apps/sim/next.config.ts +++ b/apps/sim/next.config.ts @@ -112,7 +112,7 @@ const nextConfig: NextConfig = { }, { key: 'Content-Security-Policy', - value: `default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://*.google.com https://apis.google.com https://*.vercel-insights.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; img-src 'self' data: blob: https://*.googleusercontent.com https://*.google.com https://*.atlassian.com https://cdn.discordapp.com; media-src 'self' blob:; font-src 'self' https://fonts.gstatic.com; connect-src 'self' ${process.env.OLLAMA_HOST || 'http://localhost:11434'} https://*.googleapis.com https://*.amazonaws.com https://*.s3.amazonaws.com https://*.vercel-insights.com https://*.atlassian.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; frame-src https://drive.google.com https://*.google.com; frame-ancestors 'self'; form-action 'self'; base-uri 'self'; object-src 'none'`, + value: `default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://*.google.com https://apis.google.com https://*.vercel-scripts.com https://*.vercel-insights.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; img-src 'self' data: blob: https://*.googleusercontent.com https://*.google.com https://*.atlassian.com https://cdn.discordapp.com; media-src 'self' blob:; font-src 'self' https://fonts.gstatic.com; connect-src 'self' ${process.env.OLLAMA_HOST || 'http://localhost:11434'} https://api.browser-use.com https://*.googleapis.com https://*.amazonaws.com https://*.s3.amazonaws.com https://*.vercel-insights.com https://*.atlassian.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; frame-src https://drive.google.com https://*.google.com; frame-ancestors 'self'; form-action 'self'; base-uri 'self'; object-src 'none'`, }, ], }, diff --git a/apps/sim/tools/browser_use/run_task.ts b/apps/sim/tools/browser_use/run_task.ts index da25b370a5..61d56cd643 100644 --- a/apps/sim/tools/browser_use/run_task.ts +++ b/apps/sim/tools/browser_use/run_task.ts @@ -77,14 +77,35 @@ export const runTaskTool: ToolConfig= 1000 ? params.pollInterval : 5000 - // Validate maxPollTime (minimum 5000ms, fallback to default if invalid) const maxPollTime = typeof params.maxPollTime === 'number' && params.maxPollTime >= 5000 ? params.maxPollTime @@ -92,40 +113,64 @@ export const runTaskTool: ToolConfig setTimeout(resolve, pollInterval)) elapsedTime += pollInterval } catch (error: any) { - // If there's an error polling, return the last successful result logger.error('Error polling for task status:', { message: error.message || 'Unknown error', taskId, @@ -138,7 +183,6 @@ export const runTaskTool: ToolConfig headers: () => ({ 'Content-Type': 'application/json', }), - body: (params) => ({ - task: params.task, - startUrl: params.startUrl, - outputSchema: params.outputSchema, - variables: params.variables, - apiKey: params.apiKey, - }), + body: (params) => { + let startUrl = params.startUrl + if (startUrl && !startUrl.match(/^https?:\/\//i)) { + startUrl = `https://${startUrl.trim()}` + logger.info(`Normalized URL from ${params.startUrl} to ${startUrl}`) + } + + return { + task: params.task, + startUrl: startUrl, + outputSchema: params.outputSchema, + variables: params.variables, + apiKey: params.apiKey, + } + }, }, // Transform the response