fix(browser-agents): log live URL for stagehand and browser-use

This commit is contained in:
Waleed Latif
2025-05-16 01:29:44 -07:00
parent a66c1e3b16
commit 96fdfbc3e6
6 changed files with 162 additions and 131 deletions

View File

@@ -10,13 +10,13 @@ export const dynamic = 'force-dynamic'
const logger = createLogger('LogsCleanup')
const BATCH_SIZE = 500
const BATCH_SIZE = 2000
const S3_CONFIG = {
bucket: process.env.S3_LOGS_BUCKET_NAME || '',
region: process.env.AWS_REGION || '',
}
export async function POST(request: Request) {
export async function GET(request: Request) {
try {
const authHeader = request.headers.get('authorization')
@@ -66,89 +66,103 @@ export async function POST(request: Request) {
const workflowIds = workflowsQuery.map((w) => w.id)
const oldLogs = await db
.select({
id: workflowLogs.id,
workflowId: workflowLogs.workflowId,
executionId: workflowLogs.executionId,
level: workflowLogs.level,
message: workflowLogs.message,
duration: workflowLogs.duration,
trigger: workflowLogs.trigger,
createdAt: workflowLogs.createdAt,
metadata: workflowLogs.metadata,
})
.from(workflowLogs)
.where(
and(
inArray(workflowLogs.workflowId, workflowIds),
lt(workflowLogs.createdAt, retentionDate)
)
)
.limit(BATCH_SIZE)
logger.info(`Found ${oldLogs.length} logs older than ${retentionDate.toISOString()} to archive`)
if (oldLogs.length === 0) {
return NextResponse.json({ message: 'No logs to clean up' })
}
const results = {
total: oldLogs.length,
total: 0,
archived: 0,
archiveFailed: 0,
deleted: 0,
deleteFailed: 0,
}
for (const log of oldLogs) {
const today = new Date().toISOString().split('T')[0]
const startTime = Date.now()
const MAX_BATCHES = 10
const logKey = `archived-logs/${today}/${log.id}.json`
const logData = JSON.stringify(log)
let batchesProcessed = 0
let hasMoreLogs = true
try {
await s3Client.send(
new PutObjectCommand({
Bucket: S3_CONFIG.bucket,
Key: logKey,
Body: logData,
ContentType: 'application/json',
Metadata: {
logId: String(log.id),
workflowId: String(log.workflowId),
archivedAt: new Date().toISOString(),
},
})
while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
const oldLogs = await db
.select({
id: workflowLogs.id,
workflowId: workflowLogs.workflowId,
executionId: workflowLogs.executionId,
level: workflowLogs.level,
message: workflowLogs.message,
duration: workflowLogs.duration,
trigger: workflowLogs.trigger,
createdAt: workflowLogs.createdAt,
metadata: workflowLogs.metadata,
})
.from(workflowLogs)
.where(
and(
inArray(workflowLogs.workflowId, workflowIds),
lt(workflowLogs.createdAt, retentionDate)
)
)
.limit(BATCH_SIZE)
results.archived++
results.total += oldLogs.length
for (const log of oldLogs) {
const today = new Date().toISOString().split('T')[0]
const logKey = `archived-logs/${today}/${log.id}.json`
const logData = JSON.stringify(log)
try {
const deleteResult = await db
.delete(workflowLogs)
.where(eq(workflowLogs.id, log.id))
.returning({ id: workflowLogs.id })
await s3Client.send(
new PutObjectCommand({
Bucket: S3_CONFIG.bucket,
Key: logKey,
Body: logData,
ContentType: 'application/json',
Metadata: {
logId: String(log.id),
workflowId: String(log.workflowId),
archivedAt: new Date().toISOString(),
},
})
)
if (deleteResult.length > 0) {
results.deleted++
} else {
results.archived++
try {
const deleteResult = await db
.delete(workflowLogs)
.where(eq(workflowLogs.id, log.id))
.returning({ id: workflowLogs.id })
if (deleteResult.length > 0) {
results.deleted++
} else {
results.deleteFailed++
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
}
} catch (deleteError) {
results.deleteFailed++
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
}
} catch (deleteError) {
results.deleteFailed++
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
} catch (archiveError) {
results.archiveFailed++
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
}
} catch (archiveError) {
results.archiveFailed++
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
}
batchesProcessed++
hasMoreLogs = oldLogs.length === BATCH_SIZE
logger.info(`Processed batch ${batchesProcessed}: ${oldLogs.length} logs`)
}
const timeElapsed = (Date.now() - startTime) / 1000
const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs
return NextResponse.json({
message: `Successfully processed ${results.total} logs: archived ${results.archived}, deleted ${results.deleted}`,
message: `Processed ${batchesProcessed} batches (${results.total} logs) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
results,
complete: !hasMoreLogs,
batchLimitReached: reachedLimit,
})
} catch (error) {
logger.error('Error in log cleanup process:', { error })

View File

@@ -6,11 +6,9 @@ import { ensureZodObject, normalizeUrl } from '../utils'
const logger = createLogger('StagehandAgentAPI')
// Environment variables for Browserbase
const BROWSERBASE_API_KEY = process.env.BROWSERBASE_API_KEY
const BROWSERBASE_PROJECT_ID = process.env.BROWSERBASE_PROJECT_ID
// Input validation schema
const requestSchema = z.object({
task: z.string().min(1),
startUrl: z.string().url(),
@@ -19,16 +17,13 @@ const requestSchema = z.object({
apiKey: z.string(),
})
// Helper function to get the schema object from the input
function getSchemaObject(outputSchema: Record<string, any>): Record<string, any> {
// Check if schema is nested under a 'schema' property (common pattern)
if (outputSchema.schema && typeof outputSchema.schema === 'object') {
return outputSchema.schema
}
return outputSchema
}
// Helper function to format schema for instructions
function formatSchemaForInstructions(schema: Record<string, any>): string {
try {
return JSON.stringify(schema, null, 2)
@@ -38,7 +33,6 @@ function formatSchemaForInstructions(schema: Record<string, any>): string {
}
}
// Helper to extract special action directives with variable placeholders
function extractActionDirectives(task: string): {
processedTask: string
actionDirectives: Array<{ index: number; action: string }>
@@ -438,7 +432,6 @@ export async function POST(request: NextRequest) {
let stagehand: Stagehand | null = null
try {
// Parse and validate request body
const body = await request.json()
logger.info('Received Stagehand agent request', {
startUrl: body.startUrl,

View File

@@ -6,11 +6,9 @@ import { ensureZodObject, normalizeUrl } from '../utils'
const logger = createLogger('StagehandExtractAPI')
// Environment variables for Browserbase
const BROWSERBASE_API_KEY = process.env.BROWSERBASE_API_KEY
const BROWSERBASE_PROJECT_ID = process.env.BROWSERBASE_PROJECT_ID
// Input validation schema
const requestSchema = z.object({
instruction: z.string(),
schema: z.record(z.any()),
@@ -24,7 +22,6 @@ export async function POST(request: NextRequest) {
let stagehand = null
try {
// Parse and validate request body
const body = await request.json()
logger.info('Received extraction request', {
url: body.url,
@@ -54,7 +51,6 @@ export async function POST(request: NextRequest) {
schemaType: typeof schema,
})
// Validate schema structure
if (!schema || typeof schema !== 'object') {
logger.error('Invalid schema format', { schema })
return NextResponse.json(
@@ -63,7 +59,6 @@ export async function POST(request: NextRequest) {
)
}
// Check for required environment variables
if (!BROWSERBASE_API_KEY || !BROWSERBASE_PROJECT_ID) {
logger.error('Missing required environment variables', {
hasBrowserbaseApiKey: !!BROWSERBASE_API_KEY,
@@ -76,53 +71,43 @@ export async function POST(request: NextRequest) {
)
}
// Validate OpenAI API key format
if (!apiKey || typeof apiKey !== 'string' || !apiKey.startsWith('sk-')) {
logger.error('Invalid OpenAI API key format')
return NextResponse.json({ error: 'Invalid OpenAI API key format' }, { status: 400 })
}
try {
// Initialize Stagehand with Browserbase
logger.info('Initializing Stagehand with Browserbase')
stagehand = new Stagehand({
env: 'BROWSERBASE',
apiKey: BROWSERBASE_API_KEY,
projectId: BROWSERBASE_PROJECT_ID,
verbose: 1,
// Use a custom logger wrapper that adapts our logger to Stagehand's expected format
logger: (msg) => logger.info(typeof msg === 'string' ? msg : JSON.stringify(msg)),
disablePino: true,
modelName: 'gpt-4o',
modelClientOptions: {
apiKey: apiKey, // User's OpenAI API key
apiKey: apiKey,
},
})
// Initialize Stagehand
logger.info('Starting stagehand.init()')
await stagehand.init()
logger.info('Stagehand initialized successfully')
// Navigate to the specified URL
logger.info(`Navigating to ${url}`)
await stagehand.page.goto(url, { waitUntil: 'networkidle' })
logger.info('Navigation complete')
// Prepare for extraction
logger.info('Preparing extraction schema', {
schema: JSON.stringify(schema).substring(0, 100) + '...',
})
// Extract data using Stagehand with the raw JSON schema
logger.info('Extracting data with Stagehand')
try {
// Convert the JSON schema to a Zod schema
// First check if the schema has a nested "schema" property (common pattern)
const schemaToConvert = schema.schema || schema
// Create a Zod schema from the JSON schema
let zodSchema
try {
logger.info('Creating Zod schema from JSON schema', {
@@ -130,7 +115,6 @@ export async function POST(request: NextRequest) {
hasNestedSchema: !!schema.schema,
})
// Convert the schema to a Zod schema
zodSchema = ensureZodObject(logger, schemaToConvert)
logger.info('Successfully created Zod schema')
@@ -140,29 +124,24 @@ export async function POST(request: NextRequest) {
message: schemaError instanceof Error ? schemaError.message : 'Unknown schema error',
})
// Fall back to simple extraction without schema
logger.info('Falling back to simple extraction without schema')
zodSchema = undefined
}
// Prepare extraction options
const extractOptions: any = {
instruction,
useTextExtract: !!useTextExtract,
}
// Add schema if we have one
if (zodSchema) {
extractOptions.schema = zodSchema
}
// Add selector if provided
if (selector) {
logger.info(`Using selector: ${selector}`)
extractOptions.selector = selector
}
// Get the extracted data
logger.info('Calling stagehand.page.extract with options', {
hasInstruction: !!extractOptions.instruction,
hasSchema: !!extractOptions.schema,
@@ -170,13 +149,10 @@ export async function POST(request: NextRequest) {
useTextExtract: extractOptions.useTextExtract,
})
// Call extract based on whether we have a schema or not
let extractedData
if (zodSchema) {
// Use the full options object when we have a schema
extractedData = await stagehand.page.extract(extractOptions)
} else {
// Just pass the instruction when we don't have a schema
extractedData = await stagehand.page.extract(extractOptions.instruction)
}
@@ -186,7 +162,6 @@ export async function POST(request: NextRequest) {
dataKeys: extractedData ? Object.keys(extractedData) : [],
})
// Return the extracted data
return NextResponse.json({
data: extractedData,
schema,
@@ -206,7 +181,6 @@ export async function POST(request: NextRequest) {
stack: error instanceof Error ? error.stack : undefined,
})
// Provide more detailed error information
let errorMessage = 'Unknown error during extraction'
let errorDetails: Record<string, any> = {}
@@ -217,7 +191,6 @@ export async function POST(request: NextRequest) {
stack: error.stack,
}
// Log any additional properties that might provide context
const errorObj = error as any
if (typeof errorObj.code !== 'undefined') {
errorDetails.code = errorObj.code
@@ -252,7 +225,6 @@ export async function POST(request: NextRequest) {
{ status: 500 }
)
} finally {
// Make sure to clean up Stagehand resources
if (stagehand) {
try {
logger.info('Closing Stagehand instance')

View File

@@ -112,7 +112,7 @@ const nextConfig: NextConfig = {
},
{
key: 'Content-Security-Policy',
value: `default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://*.google.com https://apis.google.com https://*.vercel-insights.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; img-src 'self' data: blob: https://*.googleusercontent.com https://*.google.com https://*.atlassian.com https://cdn.discordapp.com; media-src 'self' blob:; font-src 'self' https://fonts.gstatic.com; connect-src 'self' ${process.env.OLLAMA_HOST || 'http://localhost:11434'} https://*.googleapis.com https://*.amazonaws.com https://*.s3.amazonaws.com https://*.vercel-insights.com https://*.atlassian.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; frame-src https://drive.google.com https://*.google.com; frame-ancestors 'self'; form-action 'self'; base-uri 'self'; object-src 'none'`,
value: `default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval' https://*.google.com https://apis.google.com https://*.vercel-scripts.com https://*.vercel-insights.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; style-src 'self' 'unsafe-inline' https://fonts.googleapis.com; img-src 'self' data: blob: https://*.googleusercontent.com https://*.google.com https://*.atlassian.com https://cdn.discordapp.com; media-src 'self' blob:; font-src 'self' https://fonts.gstatic.com; connect-src 'self' ${process.env.OLLAMA_HOST || 'http://localhost:11434'} https://api.browser-use.com https://*.googleapis.com https://*.amazonaws.com https://*.s3.amazonaws.com https://*.vercel-insights.com https://*.atlassian.com https://vercel.live https://*.vercel.live https://vercel.com https://*.vercel.app; frame-src https://drive.google.com https://*.google.com; frame-ancestors 'self'; form-action 'self'; base-uri 'self'; object-src 'none'`,
},
],
},

View File

@@ -77,14 +77,35 @@ export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskR
}
const taskId = result.output.id
let liveUrlLogged = false
try {
const initialTaskResponse = await fetch(`https://api.browser-use.com/api/v1/task/${taskId}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${params.apiKey}`,
},
})
if (initialTaskResponse.ok) {
const initialTaskData = await initialTaskResponse.json()
if (initialTaskData.live_url) {
logger.info(
`BrowserUse task ${taskId} launched with live URL: ${initialTaskData.live_url}`
)
result.output.live_url = initialTaskData.live_url
liveUrlLogged = true
}
}
} catch (error) {
logger.warn(`Failed to get initial task details for ${taskId}:`, error)
}
// Validate pollInterval (minimum 1000ms, fallback to default if invalid)
const pollInterval =
typeof params.pollInterval === 'number' && params.pollInterval >= 1000
? params.pollInterval
: 5000
// Validate maxPollTime (minimum 5000ms, fallback to default if invalid)
const maxPollTime =
typeof params.maxPollTime === 'number' && params.maxPollTime >= 5000
? params.maxPollTime
@@ -92,40 +113,64 @@ export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskR
let elapsedTime = 0
// Poll until task is finished, failed, or max poll time is reached
while (elapsedTime < maxPollTime) {
try {
// Fetch task status
const taskResponse = await fetch(`https://api.browser-use.com/api/v1/task/${taskId}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${params.apiKey}`,
},
})
if (!taskResponse.ok) {
if (taskResponse.status === 422) {
const errorData = await taskResponse.json()
throw new Error(JSON.stringify(errorData))
const statusResponse = await fetch(
`https://api.browser-use.com/api/v1/task/${taskId}/status`,
{
method: 'GET',
headers: {
Authorization: `Bearer ${params.apiKey}`,
},
}
throw new Error(`Failed to get task status: ${taskResponse.statusText}`)
)
if (!statusResponse.ok) {
throw new Error(`Failed to get task status: ${statusResponse.statusText}`)
}
const taskData = await taskResponse.json()
const status = await statusResponse.json()
result.output.status = status
// Update the response with the latest task data
result.output = taskData as BrowserUseTaskOutput
logger.info(`BrowserUse task ${taskId} status: ${status}`)
if (['finished', 'failed', 'stopped'].includes(status)) {
const taskResponse = await fetch(`https://api.browser-use.com/api/v1/task/${taskId}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${params.apiKey}`,
},
})
if (taskResponse.ok) {
const taskData = await taskResponse.json()
result.output = taskData as BrowserUseTaskOutput
}
// Check if the task has completed
if (['finished', 'failed', 'stopped'].includes(taskData.status)) {
return result
}
// Wait for the poll interval
if (!liveUrlLogged && status === 'running') {
const taskResponse = await fetch(`https://api.browser-use.com/api/v1/task/${taskId}`, {
method: 'GET',
headers: {
Authorization: `Bearer ${params.apiKey}`,
},
})
if (taskResponse.ok) {
const taskData = await taskResponse.json()
if (taskData.live_url) {
logger.info(`BrowserUse task ${taskId} running with live URL: ${taskData.live_url}`)
result.output.live_url = taskData.live_url
liveUrlLogged = true
}
}
}
await new Promise((resolve) => setTimeout(resolve, pollInterval))
elapsedTime += pollInterval
} catch (error: any) {
// If there's an error polling, return the last successful result
logger.error('Error polling for task status:', {
message: error.message || 'Unknown error',
taskId,
@@ -138,7 +183,6 @@ export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskR
}
}
// If we've reached max poll time without completion
logger.warn(
`Task ${taskId} did not complete within the maximum polling time (${maxPollTime / 1000}s)`
)

View File

@@ -47,13 +47,21 @@ export const agentTool: ToolConfig<StagehandAgentParams, StagehandAgentResponse>
headers: () => ({
'Content-Type': 'application/json',
}),
body: (params) => ({
task: params.task,
startUrl: params.startUrl,
outputSchema: params.outputSchema,
variables: params.variables,
apiKey: params.apiKey,
}),
body: (params) => {
let startUrl = params.startUrl
if (startUrl && !startUrl.match(/^https?:\/\//i)) {
startUrl = `https://${startUrl.trim()}`
logger.info(`Normalized URL from ${params.startUrl} to ${startUrl}`)
}
return {
task: params.task,
startUrl: startUrl,
outputSchema: params.outputSchema,
variables: params.variables,
apiKey: params.apiKey,
}
},
},
// Transform the response