mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(connectors): contentDeferred pattern + validation fixes across all connectors (#3793)
* fix(knowledge): enqueue connector docs per-batch to survive sync timeouts * fix(connectors): convert all connectors to contentDeferred pattern and fix validation issues All 10 connectors now use contentDeferred: true in listDocuments, returning lightweight metadata stubs instead of downloading content during listing. Content is fetched lazily via getDocument only for new/changed documents, preventing Trigger.dev task timeouts on large syncs. Connector-specific fixes from validation audit: - Google Drive: metadata-based contentHash, orderBy for deterministic pagination, precise maxFiles, byte-length size check with truncation warning - OneDrive: metadata-based contentHash, orderBy for deterministic pagination - SharePoint: metadata-based contentHash, byte-length size check - Dropbox: metadata-based contentHash using content_hash field - Notion: code/equation block extraction, empty page fallback to title, reduced CHILD_PAGE_CONCURRENCY to 5, syncContext parameter - Confluence: syncContext caching for cloudId, reduced label concurrency to 5 - Gmail: use joinTagArray for label tags - Obsidian: syncRunId-based stub hash for forced re-fetch, mtime-based hash in getDocument, .trim() on vaultUrl, lightweight validateConfig - Evernote: retryOptions threaded through apiFindNotesMetadata and apiGetNote - GitHub: added contentDeferred: false to getDocument, syncContext parameter Infrastructure: - sync-engine: added syncRunId to syncContext for Obsidian change detection - confluence/utils: replaced raw fetch with fetchWithRetry, added retryOptions - oauth: added supportsRefreshTokenRotation: false for Dropbox - Updated add-connector and validate-connector skills with contentDeferred docs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(connectors): address PR review comments - metadata merge, retryOptions, UTF-8 safety - Sync engine: merge metadata from getDocument during deferred hydration, so Gmail/Obsidian/Confluence tags and metadata survive the stub→full transition - Evernote: pass retryOptions {retries:3, backoff:500} from listDocuments and getDocument callers into apiFindNotesMetadata and apiGetNote - Google Drive + SharePoint: safe UTF-8 truncation that walks back to the last complete character boundary instead of splitting multi-byte chars Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(evernote): use correct RetryOptions property names maxRetries/initialDelayMs instead of retries/backoff to match the RetryOptions interface from lib/knowledge/documents/utils. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(sync-engine): merge title from getDocument and skip unchanged docs after hydration - Merge title from getDocument during deferred hydration so Gmail documents get the email Subject header instead of the snippet text - After hydration, compare the hydrated contentHash against the stored DB hash — if they match, skip the update. This prevents Obsidian (and any connector with a force-refresh stub hash) from re-uploading and re-processing unchanged documents every sync Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(sync-engine): dedup externalIds, enable deletion reconciliation, merge sourceUrl Three sync engine gaps identified during audit: 1. Duplicate externalId guard: if a connector returns the same externalId across pages (pagination overlap), skip the second occurrence to prevent unique constraint violations on add and double-uploads on update. 2. Deletion reconciliation: previously required explicit fullSync or syncMode='full', meaning docs deleted from the source accumulated in the KB forever. Now runs on all non-incremental syncs (which return ALL docs). Includes a safety threshold: if >50% of existing docs (and >5 docs) would be deleted, skip and warn — protects against partial listing failures. Explicit fullSync bypasses the threshold. 3. sourceUrl merge: hydration now picks up sourceUrl from getDocument, falling back to the stub's sourceUrl if getDocument doesn't set one. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * lint * fix(connectors): confluence version metadata fallback and google drive maxFiles guard - Confluence: use `version?.number` directly (undefined) in metadata instead of `?? ''` (empty string) to prevent Number('') = 0 passing NaN check in mapTags. Hash still uses `?? ''` for string interpolation. - Google Drive: add early return when previouslyFetched >= maxFiles to prevent effectivePageSize <= 0 which violates the API's pageSize requirement (1-1000). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(connectors): blogpost labels and capped listing deletion reconciliation - Confluence: fetchLabelsForPages now tries both /pages/{id}/labels and /blogposts/{id}/labels, preventing label loss when getDocument hydrates blogpost content (previously returned empty labels on 404). - Sync engine: skip deletion reconciliation when listing was capped (maxFiles/maxThreads). Connectors signal this via syncContext.listingCapped. Prevents incorrect deletion of docs beyond the cap that still exist in source. fullSync override still forces deletion for explicit cleanup. - Google Drive & Gmail: set syncContext.listingCapped = true when cap is hit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(connectors): set syncContext.listingCapped in all connectors with caps OneDrive, Dropbox, SharePoint, Confluence (v2 + CQL), and Notion (3 listing functions) now set syncContext.listingCapped = true when their respective maxFiles/maxPages limit is hit. Without this, the sync engine's deletion reconciliation would run against an incomplete listing and incorrectly hard-delete documents that exist in the source but fell outside the cap window. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(evernote): thread retryOptions through apiListTags and apiListNotebooks All calls to apiListTags and apiListNotebooks in both listDocuments and getDocument now pass retryOptions for consistent retry protection across all Thrift RPC calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -71,12 +71,14 @@ export const {service}Connector: ConnectorConfig = {
|
||||
],
|
||||
|
||||
listDocuments: async (accessToken, sourceConfig, cursor) => {
|
||||
// Paginate via cursor, extract text, compute SHA-256 hash
|
||||
// Return metadata stubs with contentDeferred: true (if per-doc content fetch needed)
|
||||
// Or full documents with content (if list API returns content inline)
|
||||
// Return { documents: ExternalDocument[], nextCursor?, hasMore }
|
||||
},
|
||||
|
||||
getDocument: async (accessToken, sourceConfig, externalId) => {
|
||||
// Return ExternalDocument or null
|
||||
// Fetch full content for a single document
|
||||
// Return ExternalDocument with contentDeferred: false, or null
|
||||
},
|
||||
|
||||
validateConfig: async (accessToken, sourceConfig) => {
|
||||
@@ -281,26 +283,110 @@ Every document returned from `listDocuments`/`getDocument` must include:
|
||||
{
|
||||
externalId: string // Source-specific unique ID
|
||||
title: string // Document title
|
||||
content: string // Extracted plain text
|
||||
content: string // Extracted plain text (or '' if contentDeferred)
|
||||
contentDeferred?: boolean // true = content will be fetched via getDocument
|
||||
mimeType: 'text/plain' // Always text/plain (content is extracted)
|
||||
contentHash: string // SHA-256 of content (change detection)
|
||||
contentHash: string // Metadata-based hash for change detection
|
||||
sourceUrl?: string // Link back to original (stored on document record)
|
||||
metadata?: Record<string, unknown> // Source-specific data (fed to mapTags)
|
||||
}
|
||||
```
|
||||
|
||||
## Content Hashing (Required)
|
||||
## Content Deferral (Required for file/content-download connectors)
|
||||
|
||||
The sync engine uses content hashes for change detection:
|
||||
**All connectors that require per-document API calls to fetch content MUST use `contentDeferred: true`.** This is the standard pattern — `listDocuments` returns lightweight metadata stubs, and content is fetched lazily by the sync engine via `getDocument` only for new/changed documents.
|
||||
|
||||
This pattern is critical for reliability: the sync engine processes documents in batches and enqueues each batch for processing immediately. If a sync times out, all previously-batched documents are already queued. Without deferral, content downloads during listing can exhaust the sync task's time budget before any documents are saved.
|
||||
|
||||
### When to use `contentDeferred: true`
|
||||
|
||||
- The service's list API does NOT return document content (only metadata)
|
||||
- Content requires a separate download/export API call per document
|
||||
- Examples: Google Drive, OneDrive, SharePoint, Dropbox, Notion, Confluence, Gmail, Obsidian, Evernote, GitHub
|
||||
|
||||
### When NOT to use `contentDeferred`
|
||||
|
||||
- The list API already returns the full content inline (e.g., Slack messages, Reddit posts, HubSpot notes)
|
||||
- No per-document API call is needed to get content
|
||||
|
||||
### Content Hash Strategy
|
||||
|
||||
Use a **metadata-based** `contentHash` — never a content-based hash. The hash must be derivable from the list response metadata alone, so the sync engine can detect changes without downloading content.
|
||||
|
||||
Good metadata hash sources:
|
||||
- `modifiedTime` / `lastModifiedDateTime` — changes when file is edited
|
||||
- Git blob SHA — unique per content version
|
||||
- API-provided content hash (e.g., Dropbox `content_hash`)
|
||||
- Version number (e.g., Confluence page version)
|
||||
|
||||
Format: `{service}:{id}:{changeIndicator}`
|
||||
|
||||
```typescript
|
||||
async function computeContentHash(content: string): Promise<string> {
|
||||
const data = new TextEncoder().encode(content)
|
||||
const hashBuffer = await crypto.subtle.digest('SHA-256', data)
|
||||
return Array.from(new Uint8Array(hashBuffer)).map(b => b.toString(16).padStart(2, '0')).join('')
|
||||
// Google Drive: modifiedTime changes on edit
|
||||
contentHash: `gdrive:${file.id}:${file.modifiedTime ?? ''}`
|
||||
|
||||
// GitHub: blob SHA is a content-addressable hash
|
||||
contentHash: `gitsha:${item.sha}`
|
||||
|
||||
// Dropbox: API provides content_hash
|
||||
contentHash: `dropbox:${entry.id}:${entry.content_hash ?? entry.server_modified}`
|
||||
|
||||
// Confluence: version number increments on edit
|
||||
contentHash: `confluence:${page.id}:${page.version.number}`
|
||||
```
|
||||
|
||||
**Critical invariant:** The `contentHash` MUST be identical whether produced by `listDocuments` (stub) or `getDocument` (full doc). Both should use the same stub function to guarantee this.
|
||||
|
||||
### Implementation Pattern
|
||||
|
||||
```typescript
|
||||
// 1. Create a stub function (sync, no API calls)
|
||||
function fileToStub(file: ServiceFile): ExternalDocument {
|
||||
return {
|
||||
externalId: file.id,
|
||||
title: file.name || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://service.com/file/${file.id}`,
|
||||
contentHash: `service:${file.id}:${file.modifiedTime ?? ''}`,
|
||||
metadata: { /* fields needed by mapTags */ },
|
||||
}
|
||||
}
|
||||
|
||||
// 2. listDocuments returns stubs (fast, metadata only)
|
||||
listDocuments: async (accessToken, sourceConfig, cursor) => {
|
||||
const response = await fetchWithRetry(listUrl, { ... })
|
||||
const files = (await response.json()).files
|
||||
const documents = files.map(fileToStub)
|
||||
return { documents, nextCursor, hasMore }
|
||||
}
|
||||
|
||||
// 3. getDocument fetches content and returns full doc with SAME contentHash
|
||||
getDocument: async (accessToken, sourceConfig, externalId) => {
|
||||
const metadata = await fetchWithRetry(metadataUrl, { ... })
|
||||
const file = await metadata.json()
|
||||
if (file.trashed) return null
|
||||
|
||||
try {
|
||||
const content = await fetchContent(accessToken, file)
|
||||
if (!content.trim()) return null
|
||||
const stub = fileToStub(file)
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch content for: ${file.name}`, { error })
|
||||
return null
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Reference Implementations
|
||||
|
||||
- **Google Drive**: `connectors/google-drive/google-drive.ts` — file download/export with `modifiedTime` hash
|
||||
- **GitHub**: `connectors/github/github.ts` — git blob SHA hash
|
||||
- **Notion**: `connectors/notion/notion.ts` — blocks API with `last_edited_time` hash
|
||||
- **Confluence**: `connectors/confluence/confluence.ts` — version number hash
|
||||
|
||||
## tagDefinitions — Declared Tag Definitions
|
||||
|
||||
Declare which tags the connector populates using semantic IDs. Shown in the add-connector modal as opt-out checkboxes.
|
||||
@@ -409,7 +495,10 @@ export const CONNECTOR_REGISTRY: ConnectorRegistry = {
|
||||
|
||||
## Reference Implementations
|
||||
|
||||
- **OAuth**: `apps/sim/connectors/confluence/confluence.ts` — multiple config field types, `mapTags`, label fetching
|
||||
- **OAuth + contentDeferred**: `apps/sim/connectors/google-drive/google-drive.ts` — file download with metadata-based hash, `orderBy` for deterministic pagination
|
||||
- **OAuth + contentDeferred (blocks API)**: `apps/sim/connectors/notion/notion.ts` — complex block content extraction deferred to `getDocument`
|
||||
- **OAuth + contentDeferred (git)**: `apps/sim/connectors/github/github.ts` — blob SHA hash, tree listing
|
||||
- **OAuth + inline content**: `apps/sim/connectors/confluence/confluence.ts` — multiple config field types, `mapTags`, label fetching
|
||||
- **API key**: `apps/sim/connectors/fireflies/fireflies.ts` — GraphQL API with Bearer token auth
|
||||
|
||||
## Checklist
|
||||
@@ -425,7 +514,9 @@ export const CONNECTOR_REGISTRY: ConnectorRegistry = {
|
||||
- `selectorKey` exists in `hooks/selectors/registry.ts`
|
||||
- `dependsOn` references selector field IDs (not `canonicalParamId`)
|
||||
- Dependency `canonicalParamId` values exist in `SELECTOR_CONTEXT_FIELDS`
|
||||
- [ ] `listDocuments` handles pagination and computes content hashes
|
||||
- [ ] `listDocuments` handles pagination with metadata-based content hashes
|
||||
- [ ] `contentDeferred: true` used if content requires per-doc API calls (file download, export, blocks fetch)
|
||||
- [ ] `contentHash` is metadata-based (not content-based) and identical between stub and `getDocument`
|
||||
- [ ] `sourceUrl` set on each ExternalDocument (full URL, not relative)
|
||||
- [ ] `metadata` includes source-specific data for tag mapping
|
||||
- [ ] `tagDefinitions` declared for each semantic key returned by `mapTags`
|
||||
|
||||
@@ -141,12 +141,24 @@ For each API endpoint the connector calls:
|
||||
|
||||
## Step 6: Validate Data Transformation
|
||||
|
||||
### Content Deferral (CRITICAL)
|
||||
Connectors that require per-document API calls to fetch content (file download, export, blocks fetch) MUST use `contentDeferred: true`. This is the standard pattern for reliability — without it, content downloads during listing can exhaust the sync task's time budget before any documents are saved.
|
||||
|
||||
- [ ] If the connector downloads content per-doc during `listDocuments`, it MUST use `contentDeferred: true` instead
|
||||
- [ ] `listDocuments` returns lightweight stubs with `content: ''` and `contentDeferred: true`
|
||||
- [ ] `getDocument` fetches actual content and returns the full document with `contentDeferred: false`
|
||||
- [ ] A shared stub function (e.g., `fileToStub`) is used by both `listDocuments` and `getDocument` to guarantee `contentHash` consistency
|
||||
- [ ] `contentHash` is metadata-based (e.g., `service:{id}:{modifiedTime}`), NOT content-based — it must be derivable from list metadata alone
|
||||
- [ ] The `contentHash` is identical whether produced by `listDocuments` or `getDocument`
|
||||
|
||||
Connectors where the list API already returns content inline (e.g., Slack messages, Reddit posts) do NOT need `contentDeferred`.
|
||||
|
||||
### ExternalDocument Construction
|
||||
- [ ] `externalId` is a stable, unique identifier from the source API
|
||||
- [ ] `title` is extracted from the correct field and has a sensible fallback (e.g., `'Untitled'`)
|
||||
- [ ] `content` is plain text — HTML content is stripped using `htmlToPlainText` from `@/connectors/utils`
|
||||
- [ ] `mimeType` is `'text/plain'`
|
||||
- [ ] `contentHash` is computed using `computeContentHash` from `@/connectors/utils`
|
||||
- [ ] `contentHash` uses a metadata-based format (e.g., `service:{id}:{modifiedTime}`) for connectors with `contentDeferred: true`, or `computeContentHash` from `@/connectors/utils` for inline-content connectors
|
||||
- [ ] `sourceUrl` is a valid, complete URL back to the original resource (not relative)
|
||||
- [ ] `metadata` contains all fields referenced by `mapTags` and `tagDefinitions`
|
||||
|
||||
@@ -200,6 +212,8 @@ For each API endpoint the connector calls:
|
||||
- [ ] Fetches a single document by `externalId`
|
||||
- [ ] Returns `null` for 404 / not found (does not throw)
|
||||
- [ ] Returns the same `ExternalDocument` shape as `listDocuments`
|
||||
- [ ] If `listDocuments` uses `contentDeferred: true`, `getDocument` MUST fetch actual content and return `contentDeferred: false`
|
||||
- [ ] If `listDocuments` uses `contentDeferred: true`, `getDocument` MUST use the same stub function to ensure `contentHash` is identical
|
||||
- [ ] Handles all content types that `listDocuments` can produce (e.g., if `listDocuments` returns both pages and blogposts, `getDocument` must handle both — not hardcode one endpoint)
|
||||
- [ ] Forwards `syncContext` if it needs cached state (user names, field maps, etc.)
|
||||
- [ ] Error handling is graceful (catches, logs, returns null or throws with context)
|
||||
@@ -253,6 +267,8 @@ Group findings by severity:
|
||||
- Missing error handling that would crash the sync
|
||||
- `requiredScopes` not a subset of OAuth provider scopes
|
||||
- Query/filter injection: user-controlled values interpolated into OData `$filter`, SOQL, or query strings without escaping
|
||||
- Per-document content download in `listDocuments` without `contentDeferred: true` — causes sync timeouts for large document sets
|
||||
- `contentHash` mismatch between `listDocuments` stub and `getDocument` return — causes unnecessary re-processing every sync
|
||||
|
||||
**Warning** (incorrect behavior, data quality issues, or convention violations):
|
||||
- HTML content not stripped via `htmlToPlainText`
|
||||
@@ -300,6 +316,7 @@ After fixing, confirm:
|
||||
- [ ] Validated scopes are sufficient for all API endpoints the connector calls
|
||||
- [ ] Validated token refresh config (`useBasicAuth`, `supportsRefreshTokenRotation`)
|
||||
- [ ] Validated pagination: cursor names, page sizes, hasMore logic, no silent caps
|
||||
- [ ] Validated content deferral: `contentDeferred: true` used when per-doc content fetch required, metadata-based `contentHash` consistent between stub and `getDocument`
|
||||
- [ ] Validated data transformation: plain text extraction, HTML stripping, content hashing
|
||||
- [ ] Validated tag definitions match mapTags output, correct fieldTypes
|
||||
- [ ] Validated config fields: canonical pairs, selector keys, required flags
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { ConfluenceIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { getConfluenceCloudId } from '@/tools/confluence/utils'
|
||||
|
||||
const logger = createLogger('ConfluenceConnector')
|
||||
@@ -17,7 +17,7 @@ export function escapeCql(value: string): string {
|
||||
/**
|
||||
* Fetches labels for a batch of page IDs using the v2 labels endpoint.
|
||||
*/
|
||||
const LABEL_FETCH_CONCURRENCY = 10
|
||||
const LABEL_FETCH_CONCURRENCY = 5
|
||||
|
||||
async function fetchLabelsForPages(
|
||||
cloudId: string,
|
||||
@@ -31,23 +31,34 @@ async function fetchLabelsForPages(
|
||||
const results = await Promise.all(
|
||||
batch.map(async (pageId) => {
|
||||
try {
|
||||
const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/pages/${pageId}/labels`
|
||||
const response = await fetchWithRetry(url, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
},
|
||||
})
|
||||
let data: Record<string, unknown> | null = null
|
||||
for (const contentType of ['pages', 'blogposts']) {
|
||||
const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/api/v2/${contentType}/${pageId}/labels`
|
||||
const response = await fetchWithRetry(url, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
},
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
logger.warn(`Failed to fetch labels for page ${pageId}`, { status: response.status })
|
||||
if (response.ok) {
|
||||
data = await response.json()
|
||||
break
|
||||
}
|
||||
if (response.status !== 404) {
|
||||
logger.warn(`Failed to fetch labels for ${contentType} ${pageId}`, {
|
||||
status: response.status,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (!data) {
|
||||
return { pageId, labels: [] as string[] }
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const labels = (data.results || []).map(
|
||||
(label: Record<string, unknown>) => label.name as string
|
||||
const labels = ((data.results as Record<string, unknown>[]) || []).map(
|
||||
(label) => label.name as string
|
||||
)
|
||||
return { pageId, labels }
|
||||
} catch (error) {
|
||||
@@ -68,35 +79,29 @@ async function fetchLabelsForPages(
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a v1 CQL search result item to an ExternalDocument.
|
||||
* Converts a v1 CQL search result item to a lightweight metadata stub.
|
||||
*/
|
||||
async function cqlResultToDocument(
|
||||
item: Record<string, unknown>,
|
||||
domain: string
|
||||
): Promise<ExternalDocument> {
|
||||
const body = item.body as Record<string, Record<string, string>> | undefined
|
||||
const rawContent = body?.storage?.value || ''
|
||||
const plainText = htmlToPlainText(rawContent)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
|
||||
function cqlResultToStub(item: Record<string, unknown>, domain: string): ExternalDocument {
|
||||
const version = item.version as Record<string, unknown> | undefined
|
||||
const links = item._links as Record<string, string> | undefined
|
||||
const metadata = item.metadata as Record<string, unknown> | undefined
|
||||
const labelsWrapper = metadata?.labels as Record<string, unknown> | undefined
|
||||
const labelResults = (labelsWrapper?.results || []) as Record<string, unknown>[]
|
||||
const labels = labelResults.map((l) => l.name as string)
|
||||
const versionNumber = version?.number
|
||||
|
||||
return {
|
||||
externalId: String(item.id),
|
||||
title: (item.title as string) || 'Untitled',
|
||||
content: plainText,
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined,
|
||||
contentHash,
|
||||
contentHash: `confluence:${item.id}:${versionNumber ?? ''}`,
|
||||
metadata: {
|
||||
spaceId: (item.space as Record<string, unknown>)?.key,
|
||||
status: item.status,
|
||||
version: version?.number,
|
||||
version: versionNumber,
|
||||
labels,
|
||||
lastModified: version?.when,
|
||||
},
|
||||
@@ -238,10 +243,15 @@ export const confluenceConnector: ConnectorConfig = {
|
||||
getDocument: async (
|
||||
accessToken: string,
|
||||
sourceConfig: Record<string, unknown>,
|
||||
externalId: string
|
||||
externalId: string,
|
||||
syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocument | null> => {
|
||||
const domain = sourceConfig.domain as string
|
||||
const cloudId = await getConfluenceCloudId(domain, accessToken)
|
||||
let cloudId = syncContext?.cloudId as string | undefined
|
||||
if (!cloudId) {
|
||||
cloudId = await getConfluenceCloudId(domain, accessToken)
|
||||
if (syncContext) syncContext.cloudId = cloudId
|
||||
}
|
||||
|
||||
// Try pages first, fall back to blogposts if not found
|
||||
let page: Record<string, unknown> | null = null
|
||||
@@ -269,26 +279,26 @@ export const confluenceConnector: ConnectorConfig = {
|
||||
const storage = body?.storage as Record<string, unknown> | undefined
|
||||
const rawContent = (storage?.value as string) || ''
|
||||
const plainText = htmlToPlainText(rawContent)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
|
||||
// Fetch labels for this page
|
||||
const labelMap = await fetchLabelsForPages(cloudId, accessToken, [String(page.id)])
|
||||
const labels = labelMap.get(String(page.id)) ?? []
|
||||
|
||||
const links = page._links as Record<string, unknown> | undefined
|
||||
const version = page.version as Record<string, unknown> | undefined
|
||||
const versionNumber = version?.number
|
||||
|
||||
return {
|
||||
externalId: String(page.id),
|
||||
title: (page.title as string) || 'Untitled',
|
||||
content: plainText,
|
||||
contentDeferred: false,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: links?.webui ? `https://${domain}/wiki${links.webui}` : undefined,
|
||||
contentHash,
|
||||
contentHash: `confluence:${page.id}:${versionNumber ?? ''}`,
|
||||
metadata: {
|
||||
spaceId: page.spaceId,
|
||||
status: page.status,
|
||||
version: version?.number,
|
||||
version: versionNumber,
|
||||
labels,
|
||||
lastModified: version?.createdAt,
|
||||
},
|
||||
@@ -379,7 +389,6 @@ async function listDocumentsV2(
|
||||
): Promise<ExternalDocumentList> {
|
||||
const queryParams = new URLSearchParams()
|
||||
queryParams.append('limit', '250')
|
||||
queryParams.append('body-format', 'storage')
|
||||
if (cursor) {
|
||||
queryParams.append('cursor', cursor)
|
||||
}
|
||||
@@ -409,35 +418,30 @@ async function listDocumentsV2(
|
||||
const data = await response.json()
|
||||
const results = data.results || []
|
||||
|
||||
const pageIds = results.map((page: Record<string, unknown>) => String(page.id))
|
||||
const labelsByPageId = await fetchLabelsForPages(cloudId, accessToken, pageIds)
|
||||
const documents: ExternalDocument[] = results.map((page: Record<string, unknown>) => {
|
||||
const pageId = String(page.id)
|
||||
const version = page.version as Record<string, unknown> | undefined
|
||||
const versionNumber = version?.number
|
||||
|
||||
const documents: ExternalDocument[] = await Promise.all(
|
||||
results.map(async (page: Record<string, unknown>) => {
|
||||
const rawContent = (page.body as Record<string, Record<string, string>>)?.storage?.value || ''
|
||||
const plainText = htmlToPlainText(rawContent)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
const pageId = String(page.id)
|
||||
|
||||
return {
|
||||
externalId: pageId,
|
||||
title: (page.title as string) || 'Untitled',
|
||||
content: plainText,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: (page._links as Record<string, string>)?.webui
|
||||
? `https://${domain}/wiki${(page._links as Record<string, string>).webui}`
|
||||
: undefined,
|
||||
contentHash,
|
||||
metadata: {
|
||||
spaceId: page.spaceId,
|
||||
status: page.status,
|
||||
version: (page.version as Record<string, unknown>)?.number,
|
||||
labels: labelsByPageId.get(pageId) ?? [],
|
||||
lastModified: (page.version as Record<string, unknown>)?.createdAt,
|
||||
},
|
||||
}
|
||||
})
|
||||
)
|
||||
return {
|
||||
externalId: pageId,
|
||||
title: (page.title as string) || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: (page._links as Record<string, string>)?.webui
|
||||
? `https://${domain}/wiki${(page._links as Record<string, string>).webui}`
|
||||
: undefined,
|
||||
contentHash: `confluence:${pageId}:${versionNumber ?? ''}`,
|
||||
metadata: {
|
||||
spaceId: page.spaceId,
|
||||
status: page.status,
|
||||
version: versionNumber,
|
||||
labels: [],
|
||||
lastModified: version?.createdAt,
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
let nextCursor: string | undefined
|
||||
const nextLink = (data._links as Record<string, string>)?.next
|
||||
@@ -452,6 +456,7 @@ async function listDocumentsV2(
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxPages > 0 && totalFetched >= maxPages
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
return {
|
||||
documents,
|
||||
@@ -584,7 +589,7 @@ async function listDocumentsViaCql(
|
||||
queryParams.append('cql', cql)
|
||||
queryParams.append('limit', String(limit))
|
||||
queryParams.append('start', String(start))
|
||||
queryParams.append('expand', 'body.storage,version,metadata.labels')
|
||||
queryParams.append('expand', 'version,metadata.labels')
|
||||
|
||||
const url = `https://api.atlassian.com/ex/confluence/${cloudId}/wiki/rest/api/content/search?${queryParams.toString()}`
|
||||
|
||||
@@ -610,13 +615,14 @@ async function listDocumentsViaCql(
|
||||
const data = await response.json()
|
||||
const results = data.results || []
|
||||
|
||||
const documents: ExternalDocument[] = await Promise.all(
|
||||
results.map((item: Record<string, unknown>) => cqlResultToDocument(item, domain))
|
||||
const documents: ExternalDocument[] = results.map((item: Record<string, unknown>) =>
|
||||
cqlResultToStub(item, domain)
|
||||
)
|
||||
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxPages > 0 && totalFetched >= maxPages
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const totalSize = (data.totalSize as number) ?? 0
|
||||
const nextStart = start + results.length
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { DropboxIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('DropboxConnector')
|
||||
|
||||
@@ -33,6 +33,7 @@ interface DropboxFileEntry {
|
||||
client_modified?: string
|
||||
server_modified?: string
|
||||
size?: number
|
||||
content_hash?: string
|
||||
is_downloadable?: boolean
|
||||
}
|
||||
|
||||
@@ -76,37 +77,20 @@ async function downloadFileContent(accessToken: string, filePath: string): Promi
|
||||
return text
|
||||
}
|
||||
|
||||
async function fileToDocument(
|
||||
accessToken: string,
|
||||
entry: DropboxFileEntry
|
||||
): Promise<ExternalDocument | null> {
|
||||
try {
|
||||
const content = await downloadFileContent(accessToken, entry.path_lower)
|
||||
if (!content.trim()) {
|
||||
logger.info(`Skipping empty file: ${entry.name}`)
|
||||
return null
|
||||
}
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: entry.id,
|
||||
title: entry.name,
|
||||
content,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://www.dropbox.com/home${entry.path_display}`,
|
||||
contentHash,
|
||||
metadata: {
|
||||
path: entry.path_display,
|
||||
lastModified: entry.server_modified || entry.client_modified,
|
||||
fileSize: entry.size,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to extract content from file: ${entry.name}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
function fileToStub(entry: DropboxFileEntry): ExternalDocument {
|
||||
return {
|
||||
externalId: entry.id,
|
||||
title: entry.name,
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://www.dropbox.com/home${entry.path_display}`,
|
||||
contentHash: `dropbox:${entry.id}:${entry.content_hash ?? entry.server_modified ?? ''}`,
|
||||
metadata: {
|
||||
path: entry.path_display,
|
||||
lastModified: entry.server_modified || entry.client_modified,
|
||||
fileSize: entry.size,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,24 +194,19 @@ export const dropboxConnector: ConnectorConfig = {
|
||||
const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
|
||||
const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0
|
||||
|
||||
const CONCURRENCY = 5
|
||||
const documents: ExternalDocument[] = []
|
||||
for (let i = 0; i < supportedFiles.length; i += CONCURRENCY) {
|
||||
if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break
|
||||
const batch = supportedFiles.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(batch.map((entry) => fileToDocument(accessToken, entry)))
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
}
|
||||
let documents = supportedFiles.map(fileToStub)
|
||||
|
||||
if (maxFiles > 0) {
|
||||
const remaining = maxFiles - previouslyFetched
|
||||
if (documents.length > remaining) {
|
||||
documents.splice(remaining)
|
||||
documents = documents.slice(0, remaining)
|
||||
}
|
||||
}
|
||||
|
||||
const totalFetched = previouslyFetched + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
return {
|
||||
documents,
|
||||
@@ -260,7 +239,11 @@ export const dropboxConnector: ConnectorConfig = {
|
||||
|
||||
if (!isSupportedFile(entry)) return null
|
||||
|
||||
return fileToDocument(accessToken, entry)
|
||||
const content = await downloadFileContent(accessToken, entry.path_lower)
|
||||
if (!content.trim()) return null
|
||||
|
||||
const stub = fileToStub(entry)
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch document ${externalId}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
TYPE_STRUCT,
|
||||
} from '@/app/api/tools/evernote/lib/thrift'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('EvernoteConnector')
|
||||
|
||||
@@ -231,7 +231,8 @@ async function apiFindNotesMetadata(
|
||||
token: string,
|
||||
offset: number,
|
||||
maxNotes: number,
|
||||
notebookGuid?: string
|
||||
notebookGuid?: string,
|
||||
retryOptions?: Parameters<typeof fetchWithRetry>[2]
|
||||
): Promise<{ totalNotes: number; notes: NoteMetadata[] }> {
|
||||
const w = new ThriftWriter()
|
||||
w.writeMessageBegin('findNotesMetadata', 0)
|
||||
@@ -256,7 +257,7 @@ async function apiFindNotesMetadata(
|
||||
|
||||
w.writeFieldStop()
|
||||
|
||||
const r = await callNoteStore(token, w)
|
||||
const r = await callNoteStore(token, w, retryOptions)
|
||||
|
||||
let totalNotes = 0
|
||||
const notes: NoteMetadata[] = []
|
||||
@@ -312,7 +313,11 @@ async function apiFindNotesMetadata(
|
||||
* 5:withResourcesRecognition, 6:withResourcesAlternateData)
|
||||
* Note: 1:guid, 2:title, 3:content, 6:created, 7:updated, 11:notebookGuid, 12:tagGuids
|
||||
*/
|
||||
async function apiGetNote(token: string, guid: string): Promise<Note> {
|
||||
async function apiGetNote(
|
||||
token: string,
|
||||
guid: string,
|
||||
retryOptions?: Parameters<typeof fetchWithRetry>[2]
|
||||
): Promise<Note> {
|
||||
const w = new ThriftWriter()
|
||||
w.writeMessageBegin('getNote', 0)
|
||||
w.writeStringField(1, token)
|
||||
@@ -323,7 +328,7 @@ async function apiGetNote(token: string, guid: string): Promise<Note> {
|
||||
w.writeBoolField(6, false) // withResourcesAlternateData
|
||||
w.writeFieldStop()
|
||||
|
||||
const r = await callNoteStore(token, w)
|
||||
const r = await callNoteStore(token, w, retryOptions)
|
||||
|
||||
let noteGuid = ''
|
||||
let title = ''
|
||||
@@ -388,13 +393,14 @@ export const evernoteConnector: ConnectorConfig = {
|
||||
syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocumentList> => {
|
||||
const notebookGuid = (sourceConfig.notebookGuid as string) || undefined
|
||||
const retryOptions = { maxRetries: 3, initialDelayMs: 500 }
|
||||
|
||||
if (syncContext && !syncContext.tagMap) {
|
||||
const tags = await apiListTags(accessToken)
|
||||
const tags = await apiListTags(accessToken, retryOptions)
|
||||
syncContext.tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name]))
|
||||
}
|
||||
if (syncContext && !syncContext.notebookMap) {
|
||||
const notebooks = await apiListNotebooks(accessToken)
|
||||
const notebooks = await apiListNotebooks(accessToken, retryOptions)
|
||||
syncContext.notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name]))
|
||||
}
|
||||
|
||||
@@ -407,38 +413,33 @@ export const evernoteConnector: ConnectorConfig = {
|
||||
|
||||
logger.info('Listing Evernote notes', { offset, maxNotes: NOTES_PER_PAGE })
|
||||
|
||||
const result = await apiFindNotesMetadata(accessToken, offset, NOTES_PER_PAGE, notebookGuid)
|
||||
const result = await apiFindNotesMetadata(
|
||||
accessToken,
|
||||
offset,
|
||||
NOTES_PER_PAGE,
|
||||
notebookGuid,
|
||||
retryOptions
|
||||
)
|
||||
|
||||
const documents: ExternalDocument[] = []
|
||||
const documents: ExternalDocument[] = result.notes.map((meta) => {
|
||||
const tagNames = meta.tagGuids.map((g) => tagMap[g]).filter(Boolean)
|
||||
|
||||
for (const meta of result.notes) {
|
||||
try {
|
||||
const note = await apiGetNote(accessToken, meta.guid)
|
||||
const plainText = htmlToPlainText(note.content)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
const tagNames = note.tagGuids.map((g) => tagMap[g]).filter(Boolean)
|
||||
|
||||
documents.push({
|
||||
externalId: note.guid,
|
||||
title: note.title || 'Untitled',
|
||||
content: plainText,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${note.guid}/`,
|
||||
contentHash,
|
||||
metadata: {
|
||||
tags: tagNames,
|
||||
notebook: notebookMap[note.notebookGuid] || '',
|
||||
createdAt: note.created ? new Date(note.created).toISOString() : undefined,
|
||||
updatedAt: note.updated ? new Date(note.updated).toISOString() : undefined,
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
logger.warn('Failed to fetch note content', {
|
||||
guid: meta.guid,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return {
|
||||
externalId: meta.guid,
|
||||
title: meta.title || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${meta.guid}/`,
|
||||
contentHash: `evernote:${meta.guid}:${meta.updated}`,
|
||||
metadata: {
|
||||
tags: tagNames,
|
||||
notebook: notebookMap[meta.notebookGuid] || '',
|
||||
createdAt: meta.created ? new Date(meta.created).toISOString() : undefined,
|
||||
updatedAt: meta.updated ? new Date(meta.updated).toISOString() : undefined,
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const nextOffset = offset + result.notes.length
|
||||
const hasMore = nextOffset < result.totalNotes
|
||||
@@ -457,19 +458,21 @@ export const evernoteConnector: ConnectorConfig = {
|
||||
syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocument | null> => {
|
||||
try {
|
||||
const note = await apiGetNote(accessToken, externalId)
|
||||
const retryOptions = { maxRetries: 3, initialDelayMs: 500 }
|
||||
const note = await apiGetNote(accessToken, externalId, retryOptions)
|
||||
const plainText = htmlToPlainText(note.content)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
if (!plainText.trim()) return null
|
||||
|
||||
const shardId = extractShardId(accessToken)
|
||||
const userId = extractUserId(accessToken)
|
||||
const host = getHost(accessToken)
|
||||
|
||||
if (syncContext && !syncContext.tagMap) {
|
||||
const tags = await apiListTags(accessToken)
|
||||
const tags = await apiListTags(accessToken, retryOptions)
|
||||
syncContext.tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name]))
|
||||
}
|
||||
if (syncContext && !syncContext.notebookMap) {
|
||||
const notebooks = await apiListNotebooks(accessToken)
|
||||
const notebooks = await apiListNotebooks(accessToken, retryOptions)
|
||||
syncContext.notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name]))
|
||||
}
|
||||
|
||||
@@ -479,9 +482,9 @@ export const evernoteConnector: ConnectorConfig = {
|
||||
tagMap = syncContext.tagMap as Record<string, string>
|
||||
notebookMap = syncContext.notebookMap as Record<string, string>
|
||||
} else {
|
||||
const tags = await apiListTags(accessToken)
|
||||
const tags = await apiListTags(accessToken, retryOptions)
|
||||
tagMap = Object.fromEntries(tags.map((t) => [t.guid, t.name]))
|
||||
const notebooks = await apiListNotebooks(accessToken)
|
||||
const notebooks = await apiListNotebooks(accessToken, retryOptions)
|
||||
notebookMap = Object.fromEntries(notebooks.map((nb) => [nb.guid, nb.name]))
|
||||
}
|
||||
|
||||
@@ -492,9 +495,10 @@ export const evernoteConnector: ConnectorConfig = {
|
||||
externalId,
|
||||
title: note.title || 'Untitled',
|
||||
content: plainText,
|
||||
contentDeferred: false,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://${host}/shard/${shardId}/nl/${userId}/${externalId}/`,
|
||||
contentHash,
|
||||
contentHash: `evernote:${note.guid}:${note.updated}`,
|
||||
metadata: {
|
||||
tags: tagNames,
|
||||
notebook: notebookName,
|
||||
|
||||
@@ -228,7 +228,8 @@ export const githubConnector: ConnectorConfig = {
|
||||
getDocument: async (
|
||||
accessToken: string,
|
||||
sourceConfig: Record<string, unknown>,
|
||||
externalId: string
|
||||
externalId: string,
|
||||
_syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocument | null> => {
|
||||
const { owner, repo } = parseRepo(sourceConfig.repository as string)
|
||||
const branch = ((sourceConfig.branch as string) || 'main').trim()
|
||||
@@ -264,6 +265,7 @@ export const githubConnector: ConnectorConfig = {
|
||||
externalId,
|
||||
title: path.split('/').pop() || path,
|
||||
content,
|
||||
contentDeferred: false,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://github.com/${owner}/${repo}/blob/${encodeURIComponent(branch)}/${path.split('/').map(encodeURIComponent).join('/')}`,
|
||||
contentHash: `${GIT_SHA_PREFIX}${data.sha as string}`,
|
||||
|
||||
@@ -2,14 +2,13 @@ import { createLogger } from '@sim/logger'
|
||||
import { GmailIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('GmailConnector')
|
||||
|
||||
const GMAIL_API_BASE = 'https://gmail.googleapis.com/gmail/v1/users/me'
|
||||
const DEFAULT_MAX_THREADS = 500
|
||||
const THREADS_PER_PAGE = 100
|
||||
const CONCURRENCY = 5
|
||||
|
||||
interface GmailHeader {
|
||||
name: string
|
||||
@@ -281,6 +280,27 @@ async function resolveLabelNames(
|
||||
.filter((name) => !name.startsWith('CATEGORY_') && name !== 'UNREAD')
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a lightweight document stub from a thread list entry.
|
||||
* Uses metadata-based contentHash for change detection without downloading content.
|
||||
*/
|
||||
function threadToStub(thread: {
|
||||
id: string
|
||||
snippet?: string
|
||||
historyId?: string
|
||||
}): ExternalDocument {
|
||||
return {
|
||||
externalId: thread.id,
|
||||
title: thread.snippet || 'Untitled Thread',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`,
|
||||
contentHash: `gmail:${thread.id}:${thread.historyId ?? ''}`,
|
||||
metadata: {},
|
||||
}
|
||||
}
|
||||
|
||||
export const gmailConnector: ConnectorConfig = {
|
||||
id: 'gmail',
|
||||
name: 'Gmail',
|
||||
@@ -421,57 +441,20 @@ export const gmailConnector: ConnectorConfig = {
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const threadStubs = (data.threads || []) as { id: string }[]
|
||||
const threads = (data.threads || []) as { id: string; snippet?: string; historyId?: string }[]
|
||||
|
||||
if (threadStubs.length === 0) {
|
||||
if (threads.length === 0) {
|
||||
return { documents: [], hasMore: false }
|
||||
}
|
||||
|
||||
// Fetch full threads with concurrency limit
|
||||
const documents: ExternalDocument[] = []
|
||||
for (let i = 0; i < threadStubs.length; i += CONCURRENCY) {
|
||||
const batch = threadStubs.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(
|
||||
batch.map(async (stub) => {
|
||||
try {
|
||||
const thread = await fetchThread(accessToken, stub.id)
|
||||
if (!thread) return null
|
||||
|
||||
const { content, subject, metadata } = formatThread(thread)
|
||||
if (!content.trim()) return null
|
||||
|
||||
// Resolve label names
|
||||
const labelIds = (metadata.labelIds as string[]) || []
|
||||
const labelNames = await resolveLabelNames(accessToken, labelIds, syncContext)
|
||||
metadata.labels = labelNames
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: thread.id,
|
||||
title: subject,
|
||||
content,
|
||||
mimeType: 'text/plain' as const,
|
||||
sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`,
|
||||
contentHash,
|
||||
metadata,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to process thread ${stub.id}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
})
|
||||
)
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
}
|
||||
const documents = threads.map(threadToStub)
|
||||
|
||||
const newTotal = totalFetched + documents.length
|
||||
if (syncContext) syncContext.totalThreadsFetched = newTotal
|
||||
|
||||
const nextPageToken = data.nextPageToken as string | undefined
|
||||
const hitLimit = newTotal >= maxThreads
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
return {
|
||||
documents,
|
||||
@@ -497,15 +480,14 @@ export const gmailConnector: ConnectorConfig = {
|
||||
const labelNames = await resolveLabelNames(accessToken, labelIds, syncContext)
|
||||
metadata.labels = labelNames
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: thread.id,
|
||||
title: subject,
|
||||
content,
|
||||
contentDeferred: false,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `https://mail.google.com/mail/u/0/#inbox/${thread.id}`,
|
||||
contentHash,
|
||||
contentHash: `gmail:${thread.id}:${thread.historyId ?? ''}`,
|
||||
metadata,
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -630,9 +612,9 @@ export const gmailConnector: ConnectorConfig = {
|
||||
result.from = metadata.from
|
||||
}
|
||||
|
||||
const labels = Array.isArray(metadata.labels) ? (metadata.labels as string[]) : []
|
||||
if (labels.length > 0) {
|
||||
result.labels = labels.join(', ')
|
||||
const labels = joinTagArray(metadata.labels)
|
||||
if (labels) {
|
||||
result.labels = labels
|
||||
}
|
||||
|
||||
if (typeof metadata.messageCount === 'number') {
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { GoogleDriveIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('GoogleDriveConnector')
|
||||
|
||||
@@ -68,8 +68,12 @@ async function downloadTextFile(accessToken: string, fileId: string): Promise<st
|
||||
}
|
||||
|
||||
const text = await response.text()
|
||||
if (text.length > MAX_EXPORT_SIZE) {
|
||||
return text.slice(0, MAX_EXPORT_SIZE)
|
||||
if (Buffer.byteLength(text, 'utf8') > MAX_EXPORT_SIZE) {
|
||||
logger.warn(`File exceeds ${MAX_EXPORT_SIZE} bytes, truncating`)
|
||||
const buf = Buffer.from(text, 'utf8')
|
||||
let end = MAX_EXPORT_SIZE
|
||||
while (end > 0 && (buf[end] & 0xc0) === 0x80) end--
|
||||
return buf.subarray(0, end).toString('utf8')
|
||||
}
|
||||
return text
|
||||
}
|
||||
@@ -143,40 +147,23 @@ function buildQuery(sourceConfig: Record<string, unknown>): string {
|
||||
return parts.join(' and ')
|
||||
}
|
||||
|
||||
async function fileToDocument(
|
||||
accessToken: string,
|
||||
file: DriveFile
|
||||
): Promise<ExternalDocument | null> {
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, file.id, file.mimeType)
|
||||
if (!content.trim()) {
|
||||
logger.info(`Skipping empty file: ${file.name} (${file.id})`)
|
||||
return null
|
||||
}
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: file.id,
|
||||
title: file.name || 'Untitled',
|
||||
content,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: file.webViewLink || `https://drive.google.com/file/d/${file.id}/view`,
|
||||
contentHash,
|
||||
metadata: {
|
||||
originalMimeType: file.mimeType,
|
||||
modifiedTime: file.modifiedTime,
|
||||
createdTime: file.createdTime,
|
||||
owners: file.owners?.map((o) => o.displayName || o.emailAddress).filter(Boolean),
|
||||
starred: file.starred,
|
||||
fileSize: file.size ? Number(file.size) : undefined,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to extract content from file: ${file.name} (${file.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
function fileToStub(file: DriveFile): ExternalDocument {
|
||||
return {
|
||||
externalId: file.id,
|
||||
title: file.name || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: file.webViewLink || `https://drive.google.com/file/d/${file.id}/view`,
|
||||
contentHash: `gdrive:${file.id}:${file.modifiedTime ?? ''}`,
|
||||
metadata: {
|
||||
originalMimeType: file.mimeType,
|
||||
modifiedTime: file.modifiedTime,
|
||||
createdTime: file.createdTime,
|
||||
owners: file.owners?.map((o) => o.displayName || o.emailAddress).filter(Boolean),
|
||||
starred: file.starred,
|
||||
fileSize: file.size ? Number(file.size) : undefined,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,9 +219,20 @@ export const googleDriveConnector: ConnectorConfig = {
|
||||
const query = buildQuery(sourceConfig)
|
||||
const pageSize = 100
|
||||
|
||||
const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
|
||||
const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0
|
||||
|
||||
if (maxFiles > 0 && previouslyFetched >= maxFiles) {
|
||||
return { documents: [], hasMore: false }
|
||||
}
|
||||
|
||||
const remaining = maxFiles > 0 ? maxFiles - previouslyFetched : 0
|
||||
const effectivePageSize = maxFiles > 0 ? Math.min(pageSize, remaining) : pageSize
|
||||
|
||||
const queryParams = new URLSearchParams({
|
||||
q: query,
|
||||
pageSize: String(pageSize),
|
||||
pageSize: String(effectivePageSize),
|
||||
orderBy: 'modifiedTime desc',
|
||||
fields:
|
||||
'nextPageToken,files(id,name,mimeType,modifiedTime,createdTime,webViewLink,parents,owners,size,starred)',
|
||||
supportsAllDrives: 'true',
|
||||
@@ -269,18 +267,14 @@ export const googleDriveConnector: ConnectorConfig = {
|
||||
const data = await response.json()
|
||||
const files = (data.files || []) as DriveFile[]
|
||||
|
||||
const CONCURRENCY = 5
|
||||
const documents: ExternalDocument[] = []
|
||||
for (let i = 0; i < files.length; i += CONCURRENCY) {
|
||||
const batch = files.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(batch.map((file) => fileToDocument(accessToken, file)))
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
}
|
||||
const documents = files
|
||||
.filter((f) => isGoogleWorkspaceFile(f.mimeType) || isSupportedTextFile(f.mimeType))
|
||||
.map(fileToStub)
|
||||
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
const totalFetched = previouslyFetched + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
|
||||
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const nextPageToken = data.nextPageToken as string | undefined
|
||||
|
||||
@@ -317,7 +311,18 @@ export const googleDriveConnector: ConnectorConfig = {
|
||||
|
||||
if (file.trashed) return null
|
||||
|
||||
return fileToDocument(accessToken, file)
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, file.id, file.mimeType)
|
||||
if (!content.trim()) return null
|
||||
|
||||
const stub = fileToStub(file)
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch content for file: ${file.name} (${file.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
},
|
||||
|
||||
validateConfig: async (
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { NotionIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('NotionConnector')
|
||||
|
||||
@@ -39,6 +39,18 @@ function blocksToPlainText(blocks: Record<string, unknown>[]): string {
|
||||
const blockData = block[type] as Record<string, unknown> | undefined
|
||||
if (!blockData) return ''
|
||||
|
||||
if (type === 'code') {
|
||||
const richText = blockData.rich_text as Record<string, unknown>[] | undefined
|
||||
const language = (blockData.language as string) || ''
|
||||
const code = richText ? richTextToPlain(richText) : ''
|
||||
return language ? `\`\`\`${language}\n${code}\n\`\`\`` : `\`\`\`\n${code}\n\`\`\``
|
||||
}
|
||||
|
||||
if (type === 'equation') {
|
||||
const expression = (blockData.expression as string) || ''
|
||||
return expression ? `$$${expression}$$` : ''
|
||||
}
|
||||
|
||||
const richText = blockData.rich_text as Record<string, unknown>[] | undefined
|
||||
if (!richText) return ''
|
||||
|
||||
@@ -135,32 +147,25 @@ function extractTags(properties: Record<string, unknown>): string[] {
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a Notion page to an ExternalDocument by fetching its block content.
|
||||
* Converts a Notion page to a lightweight metadata stub (no content fetching).
|
||||
*/
|
||||
async function pageToExternalDocument(
|
||||
accessToken: string,
|
||||
page: Record<string, unknown>
|
||||
): Promise<ExternalDocument> {
|
||||
function pageToStub(page: Record<string, unknown>): ExternalDocument {
|
||||
const pageId = page.id as string
|
||||
const properties = (page.properties || {}) as Record<string, unknown>
|
||||
const title = extractTitle(properties)
|
||||
const url = page.url as string
|
||||
const lastEditedTime = (page.last_edited_time as string) ?? ''
|
||||
|
||||
// Fetch page content
|
||||
const blocks = await fetchAllBlocks(accessToken, pageId)
|
||||
const plainText = blocksToPlainText(blocks)
|
||||
const contentHash = await computeContentHash(plainText)
|
||||
|
||||
// Extract tags from multi_select/select properties
|
||||
const tags = extractTags(properties)
|
||||
|
||||
return {
|
||||
externalId: pageId,
|
||||
title: title || 'Untitled',
|
||||
content: plainText,
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: url,
|
||||
contentHash,
|
||||
contentHash: `notion:${pageId}:${lastEditedTime}`,
|
||||
metadata: {
|
||||
tags,
|
||||
lastModified: page.last_edited_time as string,
|
||||
@@ -260,7 +265,8 @@ export const notionConnector: ConnectorConfig = {
|
||||
getDocument: async (
|
||||
accessToken: string,
|
||||
_sourceConfig: Record<string, unknown>,
|
||||
externalId: string
|
||||
externalId: string,
|
||||
_syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocument | null> => {
|
||||
const response = await fetchWithRetry(`${NOTION_BASE_URL}/pages/${externalId}`, {
|
||||
method: 'GET',
|
||||
@@ -276,7 +282,20 @@ export const notionConnector: ConnectorConfig = {
|
||||
}
|
||||
|
||||
const page = await response.json()
|
||||
return pageToExternalDocument(accessToken, page)
|
||||
if (page.archived) return null
|
||||
|
||||
try {
|
||||
const blocks = await fetchAllBlocks(accessToken, externalId)
|
||||
const blockContent = blocksToPlainText(blocks)
|
||||
const stub = pageToStub(page)
|
||||
const content = blockContent.trim() || stub.title
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch content for Notion page: ${externalId}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
},
|
||||
|
||||
validateConfig: async (
|
||||
@@ -430,11 +449,12 @@ async function listFromWorkspace(
|
||||
const results = (data.results || []) as Record<string, unknown>[]
|
||||
const pages = results.filter((r) => r.object === 'page' && !(r.archived as boolean))
|
||||
|
||||
const documents = await processPages(accessToken, pages)
|
||||
const documents = pages.map(pageToStub)
|
||||
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxPages > 0 && totalFetched >= maxPages
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined)
|
||||
|
||||
@@ -485,11 +505,12 @@ async function listFromDatabase(
|
||||
const results = (data.results || []) as Record<string, unknown>[]
|
||||
const pages = results.filter((r) => r.object === 'page' && !(r.archived as boolean))
|
||||
|
||||
const documents = await processPages(accessToken, pages)
|
||||
const documents = pages.map(pageToStub)
|
||||
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxPages > 0 && totalFetched >= maxPages
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined)
|
||||
|
||||
@@ -504,7 +525,7 @@ async function listFromDatabase(
|
||||
* Lists child pages under a specific parent page.
|
||||
*
|
||||
* Uses the blocks children endpoint to find child_page blocks,
|
||||
* then fetches each page's content.
|
||||
* then fetches each page's metadata to build lightweight stubs.
|
||||
*/
|
||||
async function listFromParentPage(
|
||||
accessToken: string,
|
||||
@@ -536,15 +557,17 @@ async function listFromParentPage(
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
const blocks = (data.results || []) as Record<string, unknown>[]
|
||||
const blockResults = (data.results || []) as Record<string, unknown>[]
|
||||
|
||||
// Filter to child_page blocks only (child_database blocks cannot be fetched via the Pages API)
|
||||
const childPageIds = blocks.filter((b) => b.type === 'child_page').map((b) => b.id as string)
|
||||
const childPageIds = blockResults
|
||||
.filter((b) => b.type === 'child_page')
|
||||
.map((b) => b.id as string)
|
||||
|
||||
// Also include the root page itself on the first call (no cursor)
|
||||
const pageIdsToFetch = !cursor ? [rootPageId, ...childPageIds] : childPageIds
|
||||
|
||||
// Fetch child pages in concurrent batches
|
||||
// Fetch page metadata (not content) in concurrent batches to build stubs
|
||||
const CHILD_PAGE_CONCURRENCY = 5
|
||||
|
||||
const documents: ExternalDocument[] = []
|
||||
@@ -568,7 +591,7 @@ async function listFromParentPage(
|
||||
}
|
||||
const page = await pageResponse.json()
|
||||
if (page.archived) return null
|
||||
return pageToExternalDocument(accessToken, page)
|
||||
return pageToStub(page)
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to process child page ${pageId}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
@@ -583,6 +606,7 @@ async function listFromParentPage(
|
||||
const totalFetched = ((syncContext?.totalDocsFetched as number) ?? 0) + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxPages > 0 && totalFetched >= maxPages
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const nextCursor = hitLimit ? undefined : ((data.next_cursor as string) ?? undefined)
|
||||
|
||||
@@ -592,31 +616,3 @@ async function listFromParentPage(
|
||||
hasMore: hitLimit ? false : data.has_more === true,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts an array of Notion page objects to ExternalDocuments.
|
||||
*/
|
||||
async function processPages(
|
||||
accessToken: string,
|
||||
pages: Record<string, unknown>[]
|
||||
): Promise<ExternalDocument[]> {
|
||||
const CONCURRENCY = 3
|
||||
const documents: ExternalDocument[] = []
|
||||
for (let i = 0; i < pages.length; i += CONCURRENCY) {
|
||||
const batch = pages.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(
|
||||
batch.map(async (page) => {
|
||||
try {
|
||||
return await pageToExternalDocument(accessToken, page)
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to process Notion page ${page.id}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
})
|
||||
)
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
}
|
||||
return documents
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { ObsidianIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
import { joinTagArray, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('ObsidianConnector')
|
||||
|
||||
@@ -24,7 +24,7 @@ interface NoteJson {
|
||||
* Normalizes the vault URL by removing trailing slashes.
|
||||
*/
|
||||
function normalizeVaultUrl(url: string): string {
|
||||
return url.replace(/\/+$/, '')
|
||||
return url.trim().replace(/\/+$/, '')
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -198,51 +198,20 @@ export const obsidianConnector: ConnectorConfig = {
|
||||
const offset = cursor ? Number(cursor) : 0
|
||||
const pageFiles = allFiles.slice(offset, offset + DOCS_PER_PAGE)
|
||||
|
||||
const documents: ExternalDocument[] = []
|
||||
const syncRunId = (syncContext?.syncRunId as string) ?? ''
|
||||
|
||||
const BATCH_SIZE = 5
|
||||
for (let i = 0; i < pageFiles.length; i += BATCH_SIZE) {
|
||||
const batch = pageFiles.slice(i, i + BATCH_SIZE)
|
||||
const results = await Promise.all(
|
||||
batch.map(async (filePath) => {
|
||||
try {
|
||||
const note = await fetchNote(baseUrl, accessToken, filePath)
|
||||
const content = note.content || ''
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: filePath,
|
||||
title: titleFromPath(filePath),
|
||||
content,
|
||||
mimeType: 'text/plain' as const,
|
||||
sourceUrl: `${baseUrl}/vault/${filePath.split('/').map(encodeURIComponent).join('/')}`,
|
||||
contentHash,
|
||||
metadata: {
|
||||
tags: note.tags,
|
||||
frontmatter: note.frontmatter,
|
||||
createdAt: note.stat?.ctime ? new Date(note.stat.ctime).toISOString() : undefined,
|
||||
modifiedAt: note.stat?.mtime ? new Date(note.stat.mtime).toISOString() : undefined,
|
||||
size: note.stat?.size,
|
||||
folder: filePath.includes('/')
|
||||
? filePath.substring(0, filePath.lastIndexOf('/'))
|
||||
: '',
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to fetch note', {
|
||||
filePath,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
})
|
||||
)
|
||||
for (const doc of results) {
|
||||
if (doc) {
|
||||
documents.push(doc)
|
||||
}
|
||||
}
|
||||
}
|
||||
const documents: ExternalDocument[] = pageFiles.map((filePath) => ({
|
||||
externalId: filePath,
|
||||
title: titleFromPath(filePath),
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain' as const,
|
||||
sourceUrl: `${baseUrl}/vault/${filePath.split('/').map(encodeURIComponent).join('/')}`,
|
||||
contentHash: `obsidian:stub:${filePath}:${syncRunId}`,
|
||||
metadata: {
|
||||
folder: filePath.includes('/') ? filePath.substring(0, filePath.lastIndexOf('/')) : '',
|
||||
},
|
||||
}))
|
||||
|
||||
const nextOffset = offset + pageFiles.length
|
||||
const hasMore = nextOffset < allFiles.length
|
||||
@@ -257,7 +226,8 @@ export const obsidianConnector: ConnectorConfig = {
|
||||
getDocument: async (
|
||||
accessToken: string,
|
||||
sourceConfig: Record<string, unknown>,
|
||||
externalId: string
|
||||
externalId: string,
|
||||
_syncContext?: Record<string, unknown>
|
||||
): Promise<ExternalDocument | null> => {
|
||||
const baseUrl = normalizeVaultUrl(
|
||||
(sourceConfig.vaultUrl as string) || 'https://127.0.0.1:27124'
|
||||
@@ -266,15 +236,15 @@ export const obsidianConnector: ConnectorConfig = {
|
||||
try {
|
||||
const note = await fetchNote(baseUrl, accessToken, externalId)
|
||||
const content = note.content || ''
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId,
|
||||
title: titleFromPath(externalId),
|
||||
content,
|
||||
contentDeferred: false,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: `${baseUrl}/vault/${externalId.split('/').map(encodeURIComponent).join('/')}`,
|
||||
contentHash,
|
||||
contentHash: `obsidian:${externalId}:${note.stat?.mtime ?? ''}`,
|
||||
metadata: {
|
||||
tags: note.tags,
|
||||
frontmatter: note.frontmatter,
|
||||
@@ -329,14 +299,14 @@ export const obsidianConnector: ConnectorConfig = {
|
||||
|
||||
const folderPath = (sourceConfig.folderPath as string) || ''
|
||||
if (folderPath.trim()) {
|
||||
const files = await listVaultFiles(
|
||||
const entries = await listDirectory(
|
||||
baseUrl,
|
||||
accessToken,
|
||||
folderPath.trim(),
|
||||
VALIDATE_RETRY_OPTIONS
|
||||
)
|
||||
if (files.length === 0) {
|
||||
logger.info('Folder path returned no markdown files', { folderPath })
|
||||
if (entries.length === 0) {
|
||||
logger.info('Folder path returned no entries', { folderPath })
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { MicrosoftOneDriveIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('OneDriveConnector')
|
||||
|
||||
@@ -96,42 +96,25 @@ async function fetchFileContent(
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a OneDrive item to an ExternalDocument.
|
||||
* Converts a OneDrive item to a lightweight metadata stub (no content).
|
||||
*/
|
||||
async function itemToDocument(
|
||||
accessToken: string,
|
||||
item: OneDriveItem
|
||||
): Promise<ExternalDocument | null> {
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, item.id, item.name)
|
||||
if (!content.trim()) {
|
||||
logger.info(`Skipping empty file: ${item.name} (${item.id})`)
|
||||
return null
|
||||
}
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: item.id,
|
||||
title: item.name || 'Untitled',
|
||||
content,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: item.webUrl,
|
||||
contentHash,
|
||||
metadata: {
|
||||
name: item.name,
|
||||
lastModifiedDateTime: item.lastModifiedDateTime,
|
||||
createdBy: item.createdBy?.user?.displayName,
|
||||
size: item.size,
|
||||
webUrl: item.webUrl,
|
||||
parentPath: item.parentReference?.path,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to extract content from file: ${item.name} (${item.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
function fileToStub(item: OneDriveItem): ExternalDocument {
|
||||
return {
|
||||
externalId: item.id,
|
||||
title: item.name || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: item.webUrl,
|
||||
contentHash: `onedrive:${item.id}:${item.lastModifiedDateTime ?? ''}`,
|
||||
metadata: {
|
||||
name: item.name,
|
||||
lastModifiedDateTime: item.lastModifiedDateTime,
|
||||
createdBy: item.createdBy?.user?.displayName,
|
||||
size: item.size,
|
||||
webUrl: item.webUrl,
|
||||
parentPath: item.parentReference?.path,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,7 +182,9 @@ export const onedriveConnector: ConnectorConfig = {
|
||||
pageUrl = cursor
|
||||
}
|
||||
} else {
|
||||
pageUrl = buildListUrl(folderPath)
|
||||
const baseUrl = buildListUrl(folderPath)
|
||||
const separator = baseUrl.includes('?') ? '&' : '?'
|
||||
pageUrl = `${baseUrl}${separator}$orderby=lastModifiedDateTime desc`
|
||||
}
|
||||
|
||||
logger.info('Listing OneDrive files', {
|
||||
@@ -242,24 +227,19 @@ export const onedriveConnector: ConnectorConfig = {
|
||||
const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
|
||||
const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0
|
||||
|
||||
const CONCURRENCY = 5
|
||||
const documents: ExternalDocument[] = []
|
||||
for (let i = 0; i < textFiles.length; i += CONCURRENCY) {
|
||||
if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break
|
||||
const batch = textFiles.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(batch.map((item) => itemToDocument(accessToken, item)))
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
}
|
||||
let documents = textFiles.map(fileToStub)
|
||||
|
||||
if (maxFiles > 0) {
|
||||
const remaining = maxFiles - previouslyFetched
|
||||
if (documents.length > remaining) {
|
||||
documents.splice(remaining)
|
||||
documents = documents.slice(0, remaining)
|
||||
}
|
||||
}
|
||||
|
||||
const totalFetched = previouslyFetched + documents.length
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
const nextLink = data['@odata.nextLink']
|
||||
|
||||
@@ -273,7 +253,7 @@ export const onedriveConnector: ConnectorConfig = {
|
||||
hasMore = true
|
||||
} else if (folderQueue.length > 0) {
|
||||
const nextFolderId = folderQueue.shift()!
|
||||
const nextUrl = `${GRAPH_BASE_URL}/me/drive/items/${nextFolderId}/children`
|
||||
const nextUrl = `${GRAPH_BASE_URL}/me/drive/items/${nextFolderId}/children?$orderby=lastModifiedDateTime desc`
|
||||
nextCursor = JSON.stringify({ pageUrl: nextUrl, folderQueue })
|
||||
hasMore = true
|
||||
}
|
||||
@@ -308,10 +288,20 @@ export const onedriveConnector: ConnectorConfig = {
|
||||
|
||||
const item = (await response.json()) as OneDriveItem
|
||||
|
||||
// Only process files with supported extensions
|
||||
if (!item.file || !isSupportedTextFile(item.name)) return null
|
||||
|
||||
return itemToDocument(accessToken, item)
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, item.id, item.name)
|
||||
if (!content.trim()) return null
|
||||
|
||||
const stub = fileToStub(item)
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
},
|
||||
|
||||
validateConfig: async (
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { MicrosoftSharepointIcon } from '@/components/icons'
|
||||
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
|
||||
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
|
||||
import { computeContentHash, htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
import { htmlToPlainText, parseTagDate } from '@/connectors/utils'
|
||||
|
||||
const logger = createLogger('SharePointConnector')
|
||||
|
||||
@@ -133,8 +133,12 @@ async function downloadFileContent(
|
||||
}
|
||||
|
||||
const text = await response.text()
|
||||
if (text.length > MAX_DOWNLOAD_SIZE) {
|
||||
return text.slice(0, MAX_DOWNLOAD_SIZE)
|
||||
if (Buffer.byteLength(text, 'utf8') > MAX_DOWNLOAD_SIZE) {
|
||||
logger.warn(`File "${fileName}" exceeds ${MAX_DOWNLOAD_SIZE} bytes, truncating`)
|
||||
const buf = Buffer.from(text, 'utf8')
|
||||
let end = MAX_DOWNLOAD_SIZE
|
||||
while (end > 0 && (buf[end] & 0xc0) === 0x80) end--
|
||||
return buf.subarray(0, end).toString('utf8')
|
||||
}
|
||||
return text
|
||||
}
|
||||
@@ -156,44 +160,25 @@ async function fetchFileContent(
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a DriveItem to an ExternalDocument by downloading its content.
|
||||
* Converts a DriveItem to a lightweight metadata stub (no content download).
|
||||
*/
|
||||
async function itemToDocument(
|
||||
accessToken: string,
|
||||
siteId: string,
|
||||
item: DriveItem,
|
||||
siteName: string
|
||||
): Promise<ExternalDocument | null> {
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, siteId, item.id, item.name)
|
||||
if (!content.trim()) {
|
||||
logger.info(`Skipping empty file: ${item.name} (${item.id})`)
|
||||
return null
|
||||
}
|
||||
|
||||
const contentHash = await computeContentHash(content)
|
||||
|
||||
return {
|
||||
externalId: item.id,
|
||||
title: item.name || 'Untitled',
|
||||
content,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: item.webUrl,
|
||||
contentHash,
|
||||
metadata: {
|
||||
lastModifiedDateTime: item.lastModifiedDateTime,
|
||||
createdDateTime: item.createdDateTime,
|
||||
createdBy: item.createdBy?.user?.displayName,
|
||||
fileSize: item.size,
|
||||
path: item.parentReference?.path,
|
||||
siteName,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to extract content from file: ${item.name} (${item.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
function itemToStub(item: DriveItem, siteName: string): ExternalDocument {
|
||||
return {
|
||||
externalId: item.id,
|
||||
title: item.name || 'Untitled',
|
||||
content: '',
|
||||
contentDeferred: true,
|
||||
mimeType: 'text/plain',
|
||||
sourceUrl: item.webUrl,
|
||||
contentHash: `sharepoint:${item.id}:${item.lastModifiedDateTime ?? ''}`,
|
||||
metadata: {
|
||||
lastModifiedDateTime: item.lastModifiedDateTime,
|
||||
createdDateTime: item.createdDateTime,
|
||||
createdBy: item.createdBy?.user?.displayName,
|
||||
fileSize: item.size,
|
||||
path: item.parentReference?.path,
|
||||
siteName,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -397,31 +382,19 @@ export const sharepointConnector: ConnectorConfig = {
|
||||
// Push subfolders onto the stack for depth-first traversal
|
||||
state.folderStack.push(...subfolders)
|
||||
|
||||
// Convert files to documents in batches
|
||||
const CONCURRENCY = 5
|
||||
// Convert files to lightweight stubs (no content download)
|
||||
const previouslyFetched = totalFetched
|
||||
for (let i = 0; i < files.length; i += CONCURRENCY) {
|
||||
for (const file of files) {
|
||||
if (maxFiles > 0 && previouslyFetched + documents.length >= maxFiles) break
|
||||
const batch = files.slice(i, i + CONCURRENCY)
|
||||
const results = await Promise.all(
|
||||
batch.map((file) => itemToDocument(accessToken, siteId, file, siteName))
|
||||
)
|
||||
documents.push(...(results.filter(Boolean) as ExternalDocument[]))
|
||||
documents.push(itemToStub(file, siteName))
|
||||
}
|
||||
|
||||
totalFetched += documents.length
|
||||
if (maxFiles > 0) {
|
||||
const remaining = maxFiles - previouslyFetched
|
||||
if (documents.length > remaining) {
|
||||
documents.splice(remaining)
|
||||
totalFetched = maxFiles
|
||||
}
|
||||
}
|
||||
|
||||
if (syncContext) syncContext.totalDocsFetched = totalFetched
|
||||
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
|
||||
if (hitLimit && syncContext) syncContext.listingCapped = true
|
||||
|
||||
// Determine next cursor
|
||||
if (hitLimit) {
|
||||
return { documents, hasMore: false }
|
||||
}
|
||||
@@ -492,7 +465,18 @@ export const sharepointConnector: ConnectorConfig = {
|
||||
return null
|
||||
}
|
||||
|
||||
return itemToDocument(accessToken, siteId, item, siteName ?? siteUrl)
|
||||
try {
|
||||
const content = await fetchFileContent(accessToken, siteId, item.id, item.name)
|
||||
if (!content.trim()) return null
|
||||
|
||||
const stub = itemToStub(item, siteName ?? siteUrl)
|
||||
return { ...stub, content, contentDeferred: false }
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to fetch content for file: ${item.name} (${item.id})`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return null
|
||||
}
|
||||
},
|
||||
|
||||
validateConfig: async (
|
||||
|
||||
@@ -292,7 +292,7 @@ export async function executeSync(
|
||||
const externalDocs: ExternalDocument[] = []
|
||||
let cursor: string | undefined
|
||||
let hasMore = true
|
||||
const syncContext: Record<string, unknown> = {}
|
||||
const syncContext: Record<string, unknown> = { syncRunId: crypto.randomUUID() }
|
||||
|
||||
// Determine if this sync should be incremental
|
||||
const isIncremental =
|
||||
@@ -401,10 +401,9 @@ export async function executeSync(
|
||||
|
||||
const seenExternalIds = new Set<string>()
|
||||
|
||||
const pendingProcessing: DocumentData[] = []
|
||||
|
||||
const pendingOps: DocOp[] = []
|
||||
for (const extDoc of externalDocs) {
|
||||
if (seenExternalIds.has(extDoc.externalId)) continue
|
||||
seenExternalIds.add(extDoc.externalId)
|
||||
|
||||
if (excludedExternalIds.has(extDoc.externalId)) {
|
||||
@@ -458,13 +457,24 @@ export async function executeSync(
|
||||
syncContext
|
||||
)
|
||||
if (!fullDoc?.content.trim()) return null
|
||||
const hydratedHash = fullDoc.contentHash ?? op.extDoc.contentHash
|
||||
if (
|
||||
op.type === 'update' &&
|
||||
existingByExternalId.get(op.extDoc.externalId)?.contentHash === hydratedHash
|
||||
) {
|
||||
result.docsUnchanged++
|
||||
return null
|
||||
}
|
||||
return {
|
||||
...op,
|
||||
extDoc: {
|
||||
...op.extDoc,
|
||||
title: fullDoc.title || op.extDoc.title,
|
||||
content: fullDoc.content,
|
||||
contentHash: fullDoc.contentHash ?? op.extDoc.contentHash,
|
||||
contentHash: hydratedHash,
|
||||
contentDeferred: false,
|
||||
sourceUrl: fullDoc.sourceUrl ?? op.extDoc.sourceUrl,
|
||||
metadata: { ...op.extDoc.metadata, ...fullDoc.metadata },
|
||||
},
|
||||
}
|
||||
})
|
||||
@@ -508,10 +518,11 @@ export async function executeSync(
|
||||
})
|
||||
)
|
||||
|
||||
const batchDocs: DocumentData[] = []
|
||||
for (let j = 0; j < settled.length; j++) {
|
||||
const outcome = settled[j]
|
||||
if (outcome.status === 'fulfilled') {
|
||||
pendingProcessing.push(outcome.value)
|
||||
batchDocs.push(outcome.value)
|
||||
if (batch[j].type === 'add') result.docsAdded++
|
||||
else result.docsUpdated++
|
||||
} else {
|
||||
@@ -524,17 +535,44 @@ export async function executeSync(
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (batchDocs.length > 0) {
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
batchDocs,
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue batch for processing — will retry on next sync', {
|
||||
connectorId,
|
||||
count: batchDocs.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
|
||||
if (!isIncremental && (options?.fullSync || connector.syncMode === 'full')) {
|
||||
// Reconcile deletions for non-incremental syncs that returned ALL docs.
|
||||
// Skip when listing was capped (maxFiles/maxThreads) — unseen docs may still exist in the source.
|
||||
if (!isIncremental && (!syncContext?.listingCapped || options?.fullSync)) {
|
||||
const removedIds = existingDocs
|
||||
.filter((d) => d.externalId && !seenExternalIds.has(d.externalId))
|
||||
.map((d) => d.id)
|
||||
|
||||
if (removedIds.length > 0) {
|
||||
await hardDeleteDocuments(removedIds, syncLogId)
|
||||
result.docsDeleted += removedIds.length
|
||||
const deletionRatio = existingDocs.length > 0 ? removedIds.length / existingDocs.length : 0
|
||||
|
||||
if (deletionRatio > 0.5 && removedIds.length > 5 && !options?.fullSync) {
|
||||
logger.warn(
|
||||
`Skipping deletion of ${removedIds.length}/${existingDocs.length} docs — exceeds safety threshold. Trigger a full sync to force cleanup.`,
|
||||
{ connectorId, deletionRatio: Math.round(deletionRatio * 100) }
|
||||
)
|
||||
} else {
|
||||
await hardDeleteDocuments(removedIds, syncLogId)
|
||||
result.docsDeleted += removedIds.length
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -608,24 +646,6 @@ export async function executeSync(
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue all added/updated documents for processing in a single batch
|
||||
if (pendingProcessing.length > 0) {
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
pendingProcessing,
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue documents for processing — will retry on next sync', {
|
||||
connectorId,
|
||||
count: pendingProcessing.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await completeSyncLog(syncLogId, 'completed', result)
|
||||
|
||||
const [{ count: actualDocCount }] = await db
|
||||
|
||||
@@ -1190,6 +1190,7 @@ function getProviderAuthConfig(provider: string): ProviderAuthConfig {
|
||||
clientId,
|
||||
clientSecret,
|
||||
useBasicAuth: false,
|
||||
supportsRefreshTokenRotation: false,
|
||||
}
|
||||
}
|
||||
case 'slack': {
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
export async function getConfluenceCloudId(domain: string, accessToken: string): Promise<string> {
|
||||
const response = await fetch('https://api.atlassian.com/oauth/token/accessible-resources', {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
Accept: 'application/json',
|
||||
import type { RetryOptions } from '@/lib/knowledge/documents/utils'
|
||||
import { fetchWithRetry } from '@/lib/knowledge/documents/utils'
|
||||
|
||||
export async function getConfluenceCloudId(
|
||||
domain: string,
|
||||
accessToken: string,
|
||||
retryOptions?: RetryOptions
|
||||
): Promise<string> {
|
||||
const response = await fetchWithRetry(
|
||||
'https://api.atlassian.com/oauth/token/accessible-resources',
|
||||
{
|
||||
method: 'GET',
|
||||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
Accept: 'application/json',
|
||||
},
|
||||
},
|
||||
})
|
||||
retryOptions
|
||||
)
|
||||
|
||||
const resources = await response.json()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user