fix(airtable): refactored to remove redis bc of errors w serverless

This commit is contained in:
Waleed Latif
2025-04-06 22:52:38 -07:00
parent ca4055130f
commit 10641bebdd

View File

@@ -347,12 +347,6 @@ async function fetchAndProcessAirtablePayloads(
const consolidatedChangesMap = new Map<string, AirtableChange>()
let localProviderConfig = { ...((webhookData.providerConfig as Record<string, any>) || {}) } // Local copy
// Add log at the very beginning of the try block
logger.info(`[${requestId}] Entering fetchAndProcessAirtablePayloads try block`, {
webhookId: webhookData.id,
workflowId: workflowData.id,
})
try {
// --- Essential IDs & Config from localProviderConfig ---
const baseId = localProviderConfig.baseId
@@ -373,28 +367,14 @@ async function fetchAndProcessAirtablePayloads(
// --- Retrieve Stored Cursor from localProviderConfig ---
const storedCursor = localProviderConfig.externalWebhookCursor
logger.info(`[${requestId}] Retrieved providerConfig for cursor check`, {
webhookId: webhookData.id,
storedCursorValue: storedCursor,
type: typeof storedCursor,
})
// IMPORTANT FIX: Initialize cursor in provider config if missing
// Initialize cursor in provider config if missing
if (storedCursor === undefined || storedCursor === null) {
logger.info(`[${requestId}] Cursor is missing in providerConfig, initializing it`, {
webhookId: webhookData.id,
})
// Update the local copy
localProviderConfig.externalWebhookCursor = null
// Add cursor to the database immediately to fix the configuration
try {
// Add log before the DB update
logger.info(`[${requestId}] Attempting to initialize cursor in DB`, {
webhookId: webhookData.id,
configToSave: { ...localProviderConfig, externalWebhookCursor: null },
})
await db
.update(webhook)
.set({
@@ -404,9 +384,6 @@ async function fetchAndProcessAirtablePayloads(
.where(eq(webhook.id, webhookData.id))
localProviderConfig.externalWebhookCursor = null // Update local copy too
logger.info(`[${requestId}] Successfully initialized cursor in DB`, {
webhookId: webhookData.id,
})
} catch (initError: any) {
logger.error(`[${requestId}] Failed to initialize cursor in DB`, {
webhookId: webhookData.id,
@@ -425,22 +402,11 @@ async function fetchAndProcessAirtablePayloads(
if (storedCursor && typeof storedCursor === 'number') {
currentCursor = storedCursor
logger.info(`[${requestId}] Using stored cursor`, {
webhookId: webhookData.id,
cursor: currentCursor,
})
} else {
logger.info(`[${requestId}] No valid stored cursor found, starting poll`, {
webhookId: webhookData.id,
})
currentCursor = null // Airtable API defaults to 1 if omitted
}
// --- Get OAuth Token ---
logger.info(`[${requestId}] Attempting to get OAuth token`, {
userId: workflowData.userId,
provider: 'airtable',
})
let accessToken: string | null = null
try {
accessToken = await getOAuthToken(workflowData.userId, 'airtable')
@@ -451,9 +417,6 @@ async function fetchAndProcessAirtablePayloads(
)
throw new Error('Airtable access token not found.')
}
logger.info(`[${requestId}] Successfully obtained Airtable access token`, {
userId: workflowData.userId,
})
} catch (tokenError: any) {
logger.error(
`[${requestId}] Failed to get Airtable OAuth token for user ${workflowData.userId}`,
@@ -489,10 +452,6 @@ async function fetchAndProcessAirtablePayloads(
queryParams.set('cursor', currentCursor.toString())
}
const fullUrl = `${apiUrl}?${queryParams.toString()}`
logger.info(`[${requestId}] Calling Airtable GET /payloads (Call ${apiCallCount})`, {
url: apiUrl,
cursor: currentCursor,
})
try {
const response = await fetch(fullUrl, {
@@ -521,14 +480,6 @@ async function fetchAndProcessAirtablePayloads(
}
const receivedPayloads = responseBody.payloads || []
logger.info(
`[${requestId}] Received response from Airtable /payloads (Call ${apiCallCount})`,
{
payloadCount: receivedPayloads.length,
mightHaveMore: responseBody.mightHaveMore,
nextCursor: responseBody.cursor,
}
)
// --- Process and Consolidate Changes ---
if (receivedPayloads.length > 0) {
@@ -609,20 +560,9 @@ async function fetchAndProcessAirtablePayloads(
mightHaveMore = responseBody.mightHaveMore || false
if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) {
logger.info(`[${requestId}] Updating cursor for next potential iteration`, {
webhookId: webhookData.id,
previousCursor: currentCursor,
newCursor: nextCursor,
mightHaveMore,
})
currentCursor = nextCursor
// --- Add logging before and after DB update ---
const updatedConfig = { ...localProviderConfig, externalWebhookCursor: currentCursor }
logger.info(`[${requestId}] Attempting to persist new cursor to DB`, {
webhookId: webhookData.id,
cursor: currentCursor,
configToSave: updatedConfig, // Log the object being saved
})
try {
// Force a complete object update to ensure consistency in serverless env
await db
@@ -634,10 +574,6 @@ async function fetchAndProcessAirtablePayloads(
.where(eq(webhook.id, webhookData.id))
localProviderConfig.externalWebhookCursor = currentCursor // Update local copy too
logger.info(`[${requestId}] Successfully persisted new cursor to DB`, {
webhookId: webhookData.id,
newCursor: currentCursor,
})
} catch (dbError: any) {
logger.error(`[${requestId}] Failed to persist Airtable cursor to DB`, {
webhookId: webhookData.id,
@@ -661,10 +597,6 @@ async function fetchAndProcessAirtablePayloads(
})
mightHaveMore = false
} else {
logger.info(
`[${requestId}] Cursor unchanged or no new cursor, ending payload fetch loop`,
{ webhookId: webhookData.id, finalCursor: currentCursor, apiCall: apiCallCount }
)
mightHaveMore = false // Explicitly stop if cursor hasn't changed
}
} catch (fetchError: any) {
@@ -687,21 +619,8 @@ async function fetchAndProcessAirtablePayloads(
// Convert map values to array for final processing
const finalConsolidatedChanges = Array.from(consolidatedChangesMap.values())
logger.info(`[${requestId}] Finished polling Airtable`, {
webhookId: webhookData.id,
totalPayloadsFetched: payloadsFetched,
consolidatedCount: finalConsolidatedChanges.length,
finalCursor: currentCursor,
})
// --- Execute Workflow if we have changes (simplified - no lock check) ---
if (finalConsolidatedChanges.length > 0) {
logger.info(
`[${requestId}] Triggering workflow execution with ${finalConsolidatedChanges.length} changes`,
{
webhookId: webhookData.id,
}
)
try {
// Format the input for the executor using the consolidated changes
const input = { airtableChanges: finalConsolidatedChanges } // Use the consolidated array
@@ -714,10 +633,6 @@ async function fetchAndProcessAirtablePayloads(
executionError
)
}
} else {
logger.info(`[${requestId}] No new changes collected from Airtable, workflow not executed`, {
webhookId: webhookData.id,
})
}
} catch (error) {
// Catch any unexpected errors during the setup/polling logic itself