fix(connectors): audit fixes for sync engine, connectors, and knowledge tools

- Extract shared computeContentHash to connectors/utils.ts (dedup across 7 connectors)
- Include error'd connectors in cron auto-retry query
- Add syncContext caching for Confluence (cloudId, spaceId)
- Batch Confluence label fetches with concurrency limit of 10
- Enforce maxPages in Confluence v2 path
- Clean up stale storage files on document update
- Retry stuck documents (pending/failed) after sync completes
- Soft-delete documents and reclaim tag slots on connector deletion
- Add incremental sync support to ConnectorConfig interface
- Fix offset:0 falsy check in list_documents tool

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Waleed Latif
2026-03-05 13:56:01 -08:00
parent ea6692517b
commit 3c3c5b5b1c
15 changed files with 508 additions and 1080 deletions

View File

@@ -1,11 +1,17 @@
import { db } from '@sim/db'
import { knowledgeBase, knowledgeConnector, knowledgeConnectorSyncLog } from '@sim/db/schema'
import {
document,
knowledgeBase,
knowledgeConnector,
knowledgeConnectorSyncLog,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq, isNull } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { cleanupUnusedTagDefinitions } from '@/lib/knowledge/tags/service'
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { checkKnowledgeBaseAccess, checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
import { CONNECTOR_REGISTRY } from '@/connectors/registry'
@@ -238,7 +244,18 @@ export async function DELETE(request: NextRequest, { params }: RouteParams) {
)
)
logger.info(`[${requestId}] Soft-deleted connector ${connectorId}`)
// Soft-delete all documents belonging to this connector
await db
.update(document)
.set({ deletedAt: new Date() })
.where(and(eq(document.connectorId, connectorId), isNull(document.deletedAt)))
// Reclaim tag slots that are no longer used by any active connector
await cleanupUnusedTagDefinitions(knowledgeBaseId, requestId).catch((error) => {
logger.warn(`[${requestId}] Failed to cleanup tag definitions`, error)
})
logger.info(`[${requestId}] Soft-deleted connector ${connectorId} and its documents`)
return NextResponse.json({ success: true })
} catch (error) {

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { knowledgeConnector } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, lte } from 'drizzle-orm'
import { and, inArray, isNull, lte } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { generateRequestId } from '@/lib/core/utils/request'
@@ -34,7 +34,7 @@ export async function GET(request: NextRequest) {
.from(knowledgeConnector)
.where(
and(
eq(knowledgeConnector.status, 'active'),
inArray(knowledgeConnector.status, ['active', 'error']),
lte(knowledgeConnector.nextSyncAt, now),
isNull(knowledgeConnector.deletedAt)
)

View File

@@ -2,23 +2,13 @@ import { createLogger } from '@sim/logger'
import { AirtableIcon } from '@/components/icons'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
const logger = createLogger('AirtableConnector')
const AIRTABLE_API = 'https://api.airtable.com/v0'
const PAGE_SIZE = 100
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
/**
* Flattens a record's fields into a plain-text representation.
* Each field is rendered as "Field Name: value" on its own line.

View File

@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
import { ConfluenceIcon } from '@/components/icons'
import { fetchWithRetry } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
import { getConfluenceCloudId } from '@/tools/confluence/utils'
const logger = createLogger('ConfluenceConnector')
@@ -28,20 +29,11 @@ function htmlToPlainText(html: string): string {
return text.replace(/\s+/g, ' ').trim()
}
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const encoder = new TextEncoder()
const data = encoder.encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
const hashArray = Array.from(new Uint8Array(hashBuffer))
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('')
}
/**
* Fetches labels for a batch of page IDs using the v2 labels endpoint.
*/
const LABEL_FETCH_CONCURRENCY = 10
async function fetchLabelsForPages(
cloudId: string,
accessToken: string,
@@ -49,39 +41,42 @@ async function fetchLabelsForPages(
): Promise<Map<string, string[]>> {
const labelsByPageId = new Map<string, string[]>()
const results = await Promise.all(
pageIds.map(async (pageId) => {
try {
const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/pages/${pageId}/labels`
const response = await fetchWithRetry(url, {
method: 'GET',
headers: {
Accept: 'application/json',
Authorization: `Bearer ${accessToken}`,
},
})
for (let i = 0; i < pageIds.length; i += LABEL_FETCH_CONCURRENCY) {
const batch = pageIds.slice(i, i + LABEL_FETCH_CONCURRENCY)
const results = await Promise.all(
batch.map(async (pageId) => {
try {
const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/pages/${pageId}/labels`
const response = await fetchWithRetry(url, {
method: 'GET',
headers: {
Accept: 'application/json',
Authorization: `Bearer ${accessToken}`,
},
})
if (!response.ok) {
logger.warn(`Failed to fetch labels for page ${pageId}`, { status: response.status })
if (!response.ok) {
logger.warn(`Failed to fetch labels for page ${pageId}`, { status: response.status })
return { pageId, labels: [] as string[] }
}
const data = await response.json()
const labels = (data.results || []).map(
(label: Record<string, unknown>) => label.name as string
)
return { pageId, labels }
} catch (error) {
logger.warn(`Error fetching labels for page ${pageId}`, {
error: error instanceof Error ? error.message : String(error),
})
return { pageId, labels: [] as string[] }
}
})
)
const data = await response.json()
const labels = (data.results || []).map(
(label: Record<string, unknown>) => label.name as string
)
return { pageId, labels }
} catch (error) {
logger.warn(`Error fetching labels for page ${pageId}`, {
error: error instanceof Error ? error.message : String(error),
})
return { pageId, labels: [] as string[] }
}
})
)
for (const { pageId, labels } of results) {
labelsByPageId.set(pageId, labels)
for (const { pageId, labels } of results) {
labelsByPageId.set(pageId, labels)
}
}
return labelsByPageId
@@ -181,7 +176,8 @@ export const confluenceConnector: ConnectorConfig = {
listDocuments: async (
accessToken: string,
sourceConfig: Record<string, unknown>,
cursor?: string
cursor?: string,
syncContext?: Record<string, unknown>
): Promise<ExternalDocumentList> => {
const domain = sourceConfig.domain as string
const spaceKey = sourceConfig.spaceKey as string
@@ -189,9 +185,12 @@ export const confluenceConnector: ConnectorConfig = {
const labelFilter = (sourceConfig.labelFilter as string) || ''
const maxPages = sourceConfig.maxPages ? Number(sourceConfig.maxPages) : 0
const cloudId = await getConfluenceCloudId(domain, accessToken)
let cloudId = syncContext?.cloudId as string | undefined
if (!cloudId) {
cloudId = await getConfluenceCloudId(domain, accessToken)
if (syncContext) syncContext.cloudId = cloudId
}
// If label filtering is enabled, use CQL search via v1 API
if (labelFilter.trim()) {
return listDocumentsViaCql(
cloudId,
@@ -205,11 +204,23 @@ export const confluenceConnector: ConnectorConfig = {
)
}
// Otherwise use v2 API (default path)
const spaceId = await resolveSpaceId(cloudId, accessToken, spaceKey)
let spaceId = syncContext?.spaceId as string | undefined
if (!spaceId) {
spaceId = await resolveSpaceId(cloudId, accessToken, spaceKey)
if (syncContext) syncContext.spaceId = spaceId
}
if (contentType === 'all') {
return listAllContentTypes(cloudId, accessToken, domain, spaceId, spaceKey, maxPages, cursor)
return listAllContentTypes(
cloudId,
accessToken,
domain,
spaceId,
spaceKey,
maxPages,
cursor,
syncContext
)
}
return listDocumentsV2(
@@ -220,7 +231,8 @@ export const confluenceConnector: ConnectorConfig = {
spaceKey,
contentType,
maxPages,
cursor
cursor,
syncContext
)
},
@@ -336,7 +348,8 @@ async function listDocumentsV2(
spaceKey: string,
contentType: string,
maxPages: number,
cursor?: string
cursor?: string,
syncContext?: Record<string, unknown>
): Promise<ExternalDocumentList> {
const queryParams = new URLSearchParams()
queryParams.append('limit', '50')
@@ -370,7 +383,6 @@ async function listDocumentsV2(
const data = await response.json()
const results = data.results || []
// Fetch labels for all pages in this batch
const pageIds = results.map((page: Record<string, unknown>) => String(page.id))
const labelsByPageId = await fetchLabelsForPages(cloudId, accessToken, pageIds)
@@ -401,7 +413,6 @@ async function listDocumentsV2(
})
)
// Extract next cursor from _links.next
let nextCursor: string | undefined
const nextLink = (data._links as Record<string, string>)?.next
if (nextLink) {
@@ -412,16 +423,14 @@ async function listDocumentsV2(
}
}
// Enforce maxPages limit
if (maxPages > 0 && !cursor) {
// On subsequent pages, the sync engine tracks total count
// We signal stop by clearing hasMore when we'd exceed maxPages
}
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
if (syncContext) syncContext.totalDocsFetched = totalFetched
const hitLimit = maxPages > 0 && totalFetched >= maxPages
return {
documents,
nextCursor,
hasMore: Boolean(nextCursor),
nextCursor: hitLimit ? undefined : nextCursor,
hasMore: hitLimit ? false : Boolean(nextCursor),
}
}
@@ -436,7 +445,8 @@ async function listAllContentTypes(
spaceId: string,
spaceKey: string,
maxPages: number,
cursor?: string
cursor?: string,
syncContext?: Record<string, unknown>
): Promise<ExternalDocumentList> {
let pageCursor: string | undefined
let blogCursor: string | undefined
@@ -466,7 +476,8 @@ async function listAllContentTypes(
spaceKey,
'page',
maxPages,
pageCursor
pageCursor,
syncContext
)
results.documents.push(...pagesResult.documents)
pageCursor = pagesResult.nextCursor
@@ -482,7 +493,8 @@ async function listAllContentTypes(
spaceKey,
'blogpost',
maxPages,
blogCursor
blogCursor,
syncContext
)
results.documents.push(...blogResult.documents)
blogCursor = blogResult.nextCursor

View File

@@ -2,23 +2,13 @@ import { createLogger } from '@sim/logger'
import { GithubIcon } from '@/components/icons'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
const logger = createLogger('GitHubConnector')
const GITHUB_API_URL = 'https://api.github.com'
const BATCH_SIZE = 30
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
/**
* Parses the repository string into owner and repo.
*/

View File

@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
import { GoogleDriveIcon } from '@/components/icons'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
const logger = createLogger('GoogleDriveConnector')
@@ -22,14 +23,6 @@ const SUPPORTED_TEXT_MIME_TYPES = [
const MAX_EXPORT_SIZE = 10 * 1024 * 1024 // 10 MB (Google export limit)
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
function isGoogleWorkspaceFile(mimeType: string): boolean {
return mimeType in GOOGLE_WORKSPACE_MIME_TYPES
}

View File

@@ -2,23 +2,13 @@ import { createLogger } from '@sim/logger'
import { JiraIcon } from '@/components/icons'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
import { extractAdfText, getJiraCloudId } from '@/tools/jira/utils'
const logger = createLogger('JiraConnector')
const PAGE_SIZE = 50
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
/**
* Builds a plain-text representation of a Jira issue for knowledge base indexing.
*/

View File

@@ -2,22 +2,12 @@ import { createLogger } from '@sim/logger'
import { LinearIcon } from '@/components/icons'
import { fetchWithRetry } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
const logger = createLogger('LinearConnector')
const LINEAR_API = 'https://api.linear.app/graphql'
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
/**
* Strips Markdown formatting to produce plain text.
*/

View File

@@ -2,23 +2,13 @@ import { createLogger } from '@sim/logger'
import { NotionIcon } from '@/components/icons'
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
import { computeContentHash } from '@/connectors/utils'
const logger = createLogger('NotionConnector')
const NOTION_API_VERSION = '2022-06-28'
const NOTION_BASE_URL = 'https://api.notion.com/v1'
/**
* Computes a SHA-256 hash of the given content.
*/
async function computeContentHash(content: string): Promise<string> {
const data = new TextEncoder().encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
return Array.from(new Uint8Array(hashBuffer))
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
/**
* Extracts the title from a Notion page's properties.
*/

View File

@@ -84,17 +84,27 @@ export interface ConnectorConfig {
/** Source configuration fields rendered in the add-connector UI */
configFields: ConnectorConfigField[]
/**
* Whether this connector supports incremental sync (only fetching changes since last sync).
* When true, the sync engine passes `lastSyncAt` to `listDocuments` so the connector
* can filter to only changed documents. Connectors without this flag always do full syncs.
*/
supportsIncrementalSync?: boolean
/**
* List all documents from the configured source (handles pagination via cursor).
* syncContext is a mutable object shared across all pages of a single sync run —
* connectors can use it to cache expensive lookups (e.g. schema fetches) without
* leaking state into module-level globals.
* lastSyncAt is provided when incremental sync is active — connectors should only
* return documents modified after this timestamp.
*/
listDocuments: (
accessToken: string,
sourceConfig: Record<string, unknown>,
cursor?: string,
syncContext?: Record<string, unknown>
syncContext?: Record<string, unknown>,
lastSyncAt?: Date
) => Promise<ExternalDocumentList>
/** Fetch a single document by its external ID */

View File

@@ -0,0 +1,11 @@
/**
* Computes a SHA-256 hash of the given content string.
* Used by connectors for change detection during sync.
*/
export async function computeContentHash(content: string): Promise<string> {
const encoder = new TextEncoder()
const data = encoder.encode(content)
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
const hashArray = Array.from(new Uint8Array(hashBuffer))
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('')
}

View File

@@ -6,10 +6,11 @@ import {
knowledgeConnectorSyncLog,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull, ne } from 'drizzle-orm'
import { and, eq, inArray, isNull, ne } from 'drizzle-orm'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { isTriggerAvailable, processDocumentAsync } from '@/lib/knowledge/documents/service'
import { StorageService } from '@/lib/uploads'
import { deleteFile } from '@/lib/uploads/core/storage-service'
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { knowledgeConnectorSync } from '@/background/knowledge-connector-sync'
import { CONNECTOR_REGISTRY } from '@/connectors/registry'
@@ -154,12 +155,22 @@ export async function executeSync(
const MAX_PAGES = 500
const syncContext: Record<string, unknown> = {}
// Determine if this sync should be incremental
const isIncremental =
connectorConfig.supportsIncrementalSync &&
connector.syncMode !== 'full' &&
!options?.fullSync &&
connector.lastSyncAt != null
const lastSyncAt =
isIncremental && connector.lastSyncAt ? new Date(connector.lastSyncAt) : undefined
for (let pageNum = 0; hasMore && pageNum < MAX_PAGES; pageNum++) {
const page = await connectorConfig.listDocuments(
accessToken,
sourceConfig,
cursor,
syncContext
syncContext,
lastSyncAt
)
externalDocs.push(...page.documents)
@@ -281,7 +292,8 @@ export async function executeSync(
}
}
if (options?.fullSync || connector.syncMode === 'full') {
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
if (!isIncremental && (options?.fullSync || connector.syncMode === 'full')) {
for (const existing of existingDocs) {
if (existing.externalId && !seenExternalIds.has(existing.externalId)) {
await db
@@ -293,6 +305,45 @@ export async function executeSync(
}
}
// Retry stuck documents that failed or never completed processing
const stuckDocs = await db
.select({
id: document.id,
fileUrl: document.fileUrl,
filename: document.filename,
fileSize: document.fileSize,
})
.from(document)
.where(
and(
eq(document.connectorId, connectorId),
inArray(document.processingStatus, ['pending', 'failed']),
isNull(document.deletedAt)
)
)
if (stuckDocs.length > 0) {
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
for (const doc of stuckDocs) {
processDocumentAsync(
connector.knowledgeBaseId,
doc.id,
{
filename: doc.filename ?? 'document.txt',
fileUrl: doc.fileUrl ?? '',
fileSize: doc.fileSize ?? 0,
mimeType: 'text/plain',
},
{}
).catch((error) => {
logger.warn('Failed to retry stuck document', {
documentId: doc.id,
error: error instanceof Error ? error.message : String(error),
})
})
}
}
await db
.update(knowledgeConnectorSyncLog)
.set({
@@ -450,6 +501,14 @@ async function updateDocument(
extDoc: ExternalDocument,
sourceConfig?: Record<string, unknown>
): Promise<void> {
// Fetch old file URL before uploading replacement
const existingRows = await db
.select({ fileUrl: document.fileUrl })
.from(document)
.where(eq(document.id, existingDocId))
.limit(1)
const oldFileUrl = existingRows[0]?.fileUrl
const contentBuffer = Buffer.from(extDoc.content, 'utf-8')
const safeTitle = extDoc.title.replace(/[^a-zA-Z0-9.-]/g, '_')
const customKey = `kb/${Date.now()}-${existingDocId}-${safeTitle}.txt`
@@ -485,6 +544,22 @@ async function updateDocument(
})
.where(eq(document.id, existingDocId))
// Clean up old storage file
if (oldFileUrl) {
try {
const urlPath = new URL(oldFileUrl, 'http://localhost').pathname
const storageKey = urlPath.replace(/^\/api\/uploads\//, '')
if (storageKey && storageKey !== urlPath) {
await deleteFile({ key: storageKey, context: 'knowledge-base' })
}
} catch (error) {
logger.warn('Failed to delete old storage file', {
documentId: existingDocId,
error: error instanceof Error ? error.message : String(error),
})
}
}
processDocumentAsync(
knowledgeBaseId,
existingDocId,

View File

@@ -45,8 +45,8 @@ export const knowledgeListDocumentsTool: ToolConfig<any, KnowledgeListDocumentsR
const queryParams = new URLSearchParams()
if (params.search) queryParams.set('search', params.search)
if (params.enabledFilter) queryParams.set('enabledFilter', params.enabledFilter)
if (params.limit) queryParams.set('limit', String(params.limit))
if (params.offset) queryParams.set('offset', String(params.offset))
if (params.limit != null) queryParams.set('limit', String(params.limit))
if (params.offset != null) queryParams.set('offset', String(params.offset))
const qs = queryParams.toString()
return `/api/knowledge/${params.knowledgeBaseId}/documents${qs ? `?${qs}` : ''}`
},

File diff suppressed because it is too large Load Diff

View File

@@ -1137,4 +1137,4 @@
"breakpoints": true
}
]
}
}