Native kb connectors

This commit is contained in:
Siddharth Ganesan
2026-03-05 20:17:20 -08:00
parent 9665f49492
commit a5b148e19e
6 changed files with 472 additions and 5 deletions

View File

@@ -1,6 +1,11 @@
import { db } from '@sim/db'
import { knowledgeConnector } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { generateInternalToken } from '@/lib/auth/internal'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import type { KnowledgeBaseArgs, KnowledgeBaseResult } from '@/lib/copilot/tools/shared/schemas'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { createSingleDocument, processDocumentAsync } from '@/lib/knowledge/documents/service'
import { generateSearchEmbedding } from '@/lib/knowledge/embeddings'
import {
@@ -543,10 +548,179 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
case 'add_connector': {
if (!args.knowledgeBaseId) {
return { success: false, message: 'Knowledge base ID is required for add_connector' }
}
if (!args.connectorType) {
return { success: false, message: 'connectorType is required for add_connector' }
}
if (!args.credentialId) {
return {
success: false,
message:
'credentialId is required for add_connector. Read environment/credentials.json to find credential IDs.',
}
}
const createBody: Record<string, unknown> = {
connectorType: args.connectorType,
credentialId: args.credentialId,
sourceConfig: args.sourceConfig ?? {},
syncIntervalMinutes: args.syncIntervalMinutes ?? 1440,
}
if (args.disabledTagIds?.length) {
;(createBody.sourceConfig as Record<string, unknown>).disabledTagIds =
args.disabledTagIds
}
const createRes = await connectorApiCall(
context.userId,
`/api/knowledge/${args.knowledgeBaseId}/connectors`,
'POST',
createBody
)
if (!createRes.success) {
return { success: false, message: createRes.error }
}
const connector = createRes.data
logger.info('Connector created via copilot', {
connectorId: connector.id,
connectorType: args.connectorType,
knowledgeBaseId: args.knowledgeBaseId,
userId: context.userId,
})
return {
success: true,
message: `Connector "${args.connectorType}" added to knowledge base. Initial sync started.`,
data: {
id: connector.id,
connectorType: connector.connectorType ?? connector.connector_type,
status: connector.status,
knowledgeBaseId: args.knowledgeBaseId,
},
}
}
case 'update_connector': {
if (!args.connectorId) {
return { success: false, message: 'connectorId is required for update_connector' }
}
const kbId = await resolveKnowledgeBaseId(args.connectorId)
if (!kbId) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const updateBody: Record<string, unknown> = {}
if (args.sourceConfig !== undefined) updateBody.sourceConfig = args.sourceConfig
if (args.syncIntervalMinutes !== undefined)
updateBody.syncIntervalMinutes = args.syncIntervalMinutes
if (args.connectorStatus !== undefined) updateBody.status = args.connectorStatus
if (Object.keys(updateBody).length === 0) {
return {
success: false,
message:
'At least one of sourceConfig, syncIntervalMinutes, or connectorStatus is required',
}
}
const updateRes = await connectorApiCall(
context.userId,
`/api/knowledge/${kbId}/connectors/${args.connectorId}`,
'PATCH',
updateBody
)
if (!updateRes.success) {
return { success: false, message: updateRes.error }
}
logger.info('Connector updated via copilot', {
connectorId: args.connectorId,
userId: context.userId,
})
return {
success: true,
message: 'Connector updated successfully',
data: { id: args.connectorId, ...updateBody },
}
}
case 'delete_connector': {
if (!args.connectorId) {
return { success: false, message: 'connectorId is required for delete_connector' }
}
const deleteKbId = await resolveKnowledgeBaseId(args.connectorId)
if (!deleteKbId) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const deleteRes = await connectorApiCall(
context.userId,
`/api/knowledge/${deleteKbId}/connectors/${args.connectorId}`,
'DELETE'
)
if (!deleteRes.success) {
return { success: false, message: deleteRes.error }
}
logger.info('Connector deleted via copilot', {
connectorId: args.connectorId,
userId: context.userId,
})
return {
success: true,
message: 'Connector deleted successfully. Associated documents have been removed.',
data: { id: args.connectorId },
}
}
case 'sync_connector': {
if (!args.connectorId) {
return { success: false, message: 'connectorId is required for sync_connector' }
}
const syncKbId = await resolveKnowledgeBaseId(args.connectorId)
if (!syncKbId) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const syncRes = await connectorApiCall(
context.userId,
`/api/knowledge/${syncKbId}/connectors/${args.connectorId}/sync`,
'POST'
)
if (!syncRes.success) {
return { success: false, message: syncRes.error }
}
logger.info('Connector sync triggered via copilot', {
connectorId: args.connectorId,
userId: context.userId,
})
return {
success: true,
message: 'Sync triggered. Documents will be updated in the background.',
data: { id: args.connectorId },
}
}
default:
return {
success: false,
message: `Unknown operation: ${operation}. Supported operations: create, get, query, add_file, update, delete, list_tags, create_tag, update_tag, delete_tag, get_tag_usage`,
message: `Unknown operation: ${operation}. Supported operations: create, get, query, add_file, update, delete, list_tags, create_tag, update_tag, delete_tag, get_tag_usage, add_connector, update_connector, delete_connector, sync_connector`,
}
}
} catch (error) {
@@ -564,3 +738,43 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
},
}
async function connectorApiCall(
userId: string,
path: string,
method: string,
body?: Record<string, unknown>
): Promise<{ success: boolean; data?: any; error?: string }> {
const token = await generateInternalToken(userId)
const baseUrl = getInternalApiBaseUrl()
const res = await fetch(`${baseUrl}${path}`, {
method,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
...(body ? { body: JSON.stringify(body) } : {}),
})
const json = await res.json().catch(() => ({}))
if (!res.ok) {
return {
success: false,
error: json.error || `API returned ${res.status}`,
}
}
return { success: true, data: json.data }
}
async function resolveKnowledgeBaseId(connectorId: string): Promise<string | null> {
const rows = await db
.select({ knowledgeBaseId: knowledgeConnector.knowledgeBaseId })
.from(knowledgeConnector)
.where(and(eq(knowledgeConnector.id, connectorId), isNull(knowledgeConnector.deletedAt)))
.limit(1)
return rows[0]?.knowledgeBaseId ?? null
}

View File

@@ -33,6 +33,10 @@ export const KnowledgeBaseArgsSchema = z.object({
'update_tag',
'delete_tag',
'get_tag_usage',
'add_connector',
'update_connector',
'delete_connector',
'sync_connector',
]),
args: z
.object({
@@ -42,7 +46,7 @@ export const KnowledgeBaseArgsSchema = z.object({
description: z.string().optional(),
/** Workspace ID to associate with (required for create, optional for list) */
workspaceId: z.string().optional(),
/** Knowledge base ID (required for get, query, add_file, list_tags, create_tag, get_tag_usage) */
/** Knowledge base ID (required for get, query, add_file, list_tags, create_tag, get_tag_usage, add_connector) */
knowledgeBaseId: z.string().optional(),
/** Workspace file path to add as a document (required for add_file). Example: "files/report.pdf" */
filePath: z.string().optional(),
@@ -64,6 +68,20 @@ export const KnowledgeBaseArgsSchema = z.object({
tagDisplayName: z.string().optional(),
/** Tag field type: text, number, date, boolean (optional for create_tag, defaults to text) */
tagFieldType: z.enum(['text', 'number', 'date', 'boolean']).optional(),
/** Connector type from registry, e.g. "confluence" (required for add_connector) */
connectorType: z.string().optional(),
/** OAuth credential ID from environment/credentials.json (required for add_connector) */
credentialId: z.string().optional(),
/** Connector-specific config matching the schema in knowledgebases/connectors/{type}.json */
sourceConfig: z.record(z.unknown()).optional(),
/** Sync interval: 60, 360, 1440, 10080, or 0 for manual only (optional for add_connector, defaults to 1440) */
syncIntervalMinutes: z.number().int().min(0).optional(),
/** Connector ID (required for update_connector, delete_connector, sync_connector) */
connectorId: z.string().optional(),
/** Connector status: "active" or "paused" (optional for update_connector) */
connectorStatus: z.enum(['active', 'paused']).optional(),
/** Tag definition IDs to disable (optional for add_connector) */
disabledTagIds: z.array(z.string()).optional(),
})
.optional(),
})

View File

@@ -75,6 +75,7 @@ export function serializeKBMeta(kb: {
createdAt: Date
updatedAt: Date
documentCount: number
connectorTypes?: string[]
}): string {
return JSON.stringify(
{
@@ -85,6 +86,8 @@ export function serializeKBMeta(kb: {
embeddingDimension: kb.embeddingDimension,
tokenCount: kb.tokenCount,
documentCount: kb.documentCount,
connectorTypes:
kb.connectorTypes && kb.connectorTypes.length > 0 ? kb.connectorTypes : undefined,
createdAt: kb.createdAt.toISOString(),
updatedAt: kb.updatedAt.toISOString(),
},
@@ -126,6 +129,140 @@ export function serializeDocuments(
)
}
/**
* Serialize KB connectors for VFS knowledgebases/{name}/connectors.json.
* Shows connector type, sync status, and schedule — NOT credentials or source config.
*/
export function serializeConnectors(
connectors: Array<{
id: string
connectorType: string
status: string
syncMode: string
syncIntervalMinutes: number
lastSyncAt: Date | null
lastSyncError: string | null
lastSyncDocCount: number | null
nextSyncAt: Date | null
consecutiveFailures: number
createdAt: Date
}>
): string {
return JSON.stringify(
connectors.map((c) => ({
id: c.id,
connectorType: c.connectorType,
status: c.status,
syncMode: c.syncMode,
syncIntervalMinutes: c.syncIntervalMinutes,
lastSyncAt: c.lastSyncAt?.toISOString(),
lastSyncError: c.lastSyncError || undefined,
lastSyncDocCount: c.lastSyncDocCount ?? undefined,
nextSyncAt: c.nextSyncAt?.toISOString(),
consecutiveFailures: c.consecutiveFailures,
createdAt: c.createdAt.toISOString(),
})),
null,
2
)
}
/**
* Connector config field shape (mirrors ConnectorConfigField from connectors/types.ts
* but avoids importing React-dependent code into serializers).
*/
interface SerializableConfigField {
id: string
title: string
type: string
placeholder?: string
required?: boolean
description?: string
options?: Array<{ label: string; id: string }>
}
interface SerializableTagDef {
id: string
displayName: string
fieldType: string
}
interface SerializableConnectorConfig {
id: string
name: string
description: string
version: string
oauth: { provider: string; requiredScopes?: string[] }
configFields: SerializableConfigField[]
tagDefinitions?: SerializableTagDef[]
supportsIncrementalSync?: boolean
}
/**
* Serialize a single connector type's schema for VFS knowledgebases/connectors/{type}.json.
* Contains everything the LLM needs to build a valid sourceConfig.
*/
export function serializeConnectorSchema(connector: SerializableConnectorConfig): string {
return JSON.stringify(
{
id: connector.id,
name: connector.name,
description: connector.description,
version: connector.version,
oauth: {
provider: connector.oauth.provider,
requiredScopes: connector.oauth.requiredScopes ?? [],
},
configFields: connector.configFields.map((f) => {
const field: Record<string, unknown> = {
id: f.id,
title: f.title,
type: f.type,
}
if (f.required) field.required = true
if (f.placeholder) field.placeholder = f.placeholder
if (f.description) field.description = f.description
if (f.options) field.options = f.options
return field
}),
tagDefinitions: connector.tagDefinitions ?? [],
supportsIncrementalSync: connector.supportsIncrementalSync ?? false,
},
null,
2
)
}
/**
* Generate the knowledgebases/connectors/connectors.md overview file.
* Lists all available connector types with their OAuth providers — enough
* for the LLM to identify the right type and credential, then read the
* per-connector schema file for full config details.
*/
export function serializeConnectorOverview(
connectors: SerializableConnectorConfig[]
): string {
const rows = connectors.map((c) => {
const scopes = c.oauth.requiredScopes?.length
? c.oauth.requiredScopes.join(', ')
: '(none)'
return `| ${c.id} | ${c.name} | ${c.oauth.provider} | ${scopes} |`
})
return [
'# Available KB Connectors',
'',
'Use `read("knowledgebases/connectors/{type}.json")` to get the full config schema before calling `add_connector`.',
'',
'| Type | Name | OAuth Provider | Required Scopes |',
'|------|------|---------------|-----------------|',
...rows,
'',
'To add a connector, the user must have an OAuth credential for that provider.',
'Check `environment/credentials.json` for available credential IDs.',
].join('\n')
}
/**
* Serialize workspace file metadata for VFS files/{name}/meta.json
*/
@@ -280,6 +417,7 @@ export function serializeBlockSchema(block: BlockConfig): string {
*/
export function serializeCredentials(
accounts: Array<{
id?: string
providerId: string
scope: string | null
createdAt: Date
@@ -287,6 +425,7 @@ export function serializeCredentials(
): string {
return JSON.stringify(
accounts.map((a) => ({
id: a.id || undefined,
provider: a.providerId,
scope: a.scope || undefined,
connectedAt: a.createdAt.toISOString(),

View File

@@ -5,6 +5,7 @@ import {
copilotChats,
document,
form,
knowledgeConnector,
mcpServers as mcpServersTable,
workflowDeploymentVersion,
workflowExecutionLogs,
@@ -21,6 +22,9 @@ import type { DeploymentData } from '@/lib/copilot/vfs/serializers'
import {
serializeApiKeys,
serializeBlockSchema,
serializeConnectorOverview,
serializeConnectors,
serializeConnectorSchema,
serializeCredentials,
serializeCustomTool,
serializeDeployments,
@@ -59,6 +63,7 @@ import { listSkills } from '@/lib/workflows/skills/operations'
import { listWorkflows } from '@/lib/workflows/utils'
import { getUsersWithPermissions, getWorkspaceWithOwner } from '@/lib/workspaces/permissions/utils'
import { getAllBlocks } from '@/blocks/registry'
import { CONNECTOR_REGISTRY } from '@/connectors/registry'
import { tools as toolRegistry } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
@@ -182,10 +187,27 @@ function getStaticComponentFiles(): Map<string, string> {
)
)
const connectorConfigs = Object.values(CONNECTOR_REGISTRY).map((c) => ({
id: c.id,
name: c.name,
description: c.description,
version: c.version,
oauth: { provider: c.oauth.provider, requiredScopes: c.oauth.requiredScopes },
configFields: c.configFields,
tagDefinitions: c.tagDefinitions,
supportsIncrementalSync: c.supportsIncrementalSync,
}))
files.set('knowledgebases/connectors/connectors.md', serializeConnectorOverview(connectorConfigs))
for (const cc of connectorConfigs) {
files.set(`knowledgebases/connectors/${cc.id}.json`, serializeConnectorSchema(cc))
}
logger.info('Static component files built', {
blocks: visibleBlocks.length,
blocksFiltered,
integrations: integrationCount,
connectors: connectorConfigs.length,
})
staticComponentFiles = files
@@ -203,6 +225,7 @@ function getStaticComponentFiles(): Map<string, string> {
* workflows/{name}/deployment.json
* knowledgebases/{name}/meta.json
* knowledgebases/{name}/documents.json
* knowledgebases/{name}/connectors.json
* tables/{name}/meta.json
* files/{name}/meta.json
* jobs/{title}/meta.json
@@ -212,6 +235,8 @@ function getStaticComponentFiles(): Map<string, string> {
* environment/credentials.json
* environment/api-keys.json
* environment/variables.json
* knowledgebases/connectors/connectors.md (available connector types overview)
* knowledgebases/connectors/{type}.json (per-connector config schema)
* components/blocks/{type}.json
* components/integrations/{service}/{operation}.json
*/
@@ -521,6 +546,7 @@ export class WorkspaceVFS {
createdAt: kb.createdAt,
updatedAt: kb.updatedAt,
documentCount: kb.docCount,
connectorTypes: kb.connectorTypes,
})
)
@@ -549,6 +575,39 @@ export class WorkspaceVFS {
error: err instanceof Error ? err.message : String(err),
})
}
try {
const connectorRows = await db
.select({
id: knowledgeConnector.id,
connectorType: knowledgeConnector.connectorType,
status: knowledgeConnector.status,
syncMode: knowledgeConnector.syncMode,
syncIntervalMinutes: knowledgeConnector.syncIntervalMinutes,
lastSyncAt: knowledgeConnector.lastSyncAt,
lastSyncError: knowledgeConnector.lastSyncError,
lastSyncDocCount: knowledgeConnector.lastSyncDocCount,
nextSyncAt: knowledgeConnector.nextSyncAt,
consecutiveFailures: knowledgeConnector.consecutiveFailures,
createdAt: knowledgeConnector.createdAt,
})
.from(knowledgeConnector)
.where(
and(
eq(knowledgeConnector.knowledgeBaseId, kb.id),
isNull(knowledgeConnector.deletedAt)
)
)
if (connectorRows.length > 0) {
this.files.set(`${prefix}connectors.json`, serializeConnectors(connectorRows))
}
} catch (err) {
logger.warn('Failed to load KB connectors', {
knowledgeBaseId: kb.id,
error: err instanceof Error ? err.message : String(err),
})
}
})
)
@@ -556,6 +615,7 @@ export class WorkspaceVFS {
id: kb.id,
name: kb.name,
description: kb.description,
connectorTypes: kb.connectorTypes.length > 0 ? kb.connectorTypes : undefined,
}))
}
@@ -1023,6 +1083,7 @@ export class WorkspaceVFS {
createdAt: c.updatedAt,
})),
...oauthCredentials.map((c) => ({
id: c.id,
providerId: c.providerId,
scope: null,
createdAt: c.updatedAt,

View File

@@ -2,6 +2,7 @@ import { db } from '@sim/db'
import {
copilotChats,
knowledgeBase,
knowledgeConnector,
mcpServers,
userTableDefinitions,
userTableRows,
@@ -10,7 +11,7 @@ import {
workspace,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, count, desc, eq, isNull } from 'drizzle-orm'
import { and, count, desc, eq, inArray, isNull } from 'drizzle-orm'
import { getAccessibleOAuthCredentials } from '@/lib/credentials/environment'
import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace'
import { listCustomTools } from '@/lib/workflows/custom-tools/operations'
@@ -29,7 +30,12 @@ export interface WorkspaceMdData {
isDeployed: boolean
lastRunAt?: Date | null
}>
knowledgeBases: Array<{ id: string; name: string; description?: string | null }>
knowledgeBases: Array<{
id: string
name: string
description?: string | null
connectorTypes?: string[]
}>
tables: Array<{ id: string; name: string; description?: string | null; rowCount: number }>
files: Array<{ name: string; type: string; size: number }>
credentials: Array<{ providerId: string }>
@@ -88,6 +94,9 @@ export function buildWorkspaceMd(data: WorkspaceMdData): string {
const lines = data.knowledgeBases.map((kb) => {
let line = `- **${kb.name}** (${kb.id})`
if (kb.description) line += `${kb.description}`
if (kb.connectorTypes && kb.connectorTypes.length > 0) {
line += ` | connectors: ${kb.connectorTypes.join(', ')}`
}
return line
})
sections.push(`## Knowledge Bases (${data.knowledgeBases.length})\n${lines.join('\n')}`)
@@ -292,11 +301,34 @@ export async function generateWorkspaceContext(
)
: []
const kbIds = kbs.map((kb) => kb.id)
const connectorRows =
kbIds.length > 0
? await db
.select({
knowledgeBaseId: knowledgeConnector.knowledgeBaseId,
connectorType: knowledgeConnector.connectorType,
})
.from(knowledgeConnector)
.where(and(inArray(knowledgeConnector.knowledgeBaseId, kbIds), isNull(knowledgeConnector.deletedAt)))
: []
const connectorTypesByKb = new Map<string, string[]>()
for (const row of connectorRows) {
const types = connectorTypesByKb.get(row.knowledgeBaseId) ?? []
if (!types.includes(row.connectorType)) {
types.push(row.connectorType)
}
connectorTypesByKb.set(row.knowledgeBaseId, types)
}
return buildWorkspaceMd({
workspace: wsRow,
members,
workflows,
knowledgeBases: kbs,
knowledgeBases: kbs.map((kb) => ({
...kb,
connectorTypes: connectorTypesByKb.get(kb.id),
})),
tables: tables.map((t, i) => ({ ...t, rowCount: rowCounts[i] ?? 0 })),
files: files.map((f) => ({ name: f.name, type: f.type, size: f.size })),
credentials: credentials.map((c) => ({ providerId: c.providerId })),

View File

@@ -360,6 +360,7 @@ export async function getAccessibleEnvCredentials(
}
export interface AccessibleOAuthCredential {
id: string
providerId: string
displayName: string
updatedAt: Date
@@ -371,6 +372,7 @@ export async function getAccessibleOAuthCredentials(
): Promise<AccessibleOAuthCredential[]> {
const rows = await db
.select({
id: credential.id,
providerId: credential.providerId,
displayName: credential.displayName,
updatedAt: credential.updatedAt,
@@ -389,6 +391,7 @@ export async function getAccessibleOAuthCredentials(
return rows
.filter((row): row is AccessibleOAuthCredential => Boolean(row.providerId))
.map((row) => ({
id: row.id,
providerId: row.providerId!,
displayName: row.displayName,
updatedAt: row.updatedAt,