Compare commits

...

1 Commits

Author SHA1 Message Date
Siddharth Ganesan
539900e672 Fix mothership csv imports, add tools 2026-03-17 16:34:18 -07:00
5 changed files with 519 additions and 137 deletions

View File

@@ -21,12 +21,24 @@ const SCHEMA_SAMPLE_SIZE = 100
type ColumnType = 'string' | 'number' | 'boolean' | 'date'
async function parseCsvBuffer(
buffer: Buffer,
delimiter = ','
function detectDelimiter(content: string): string {
const firstLine = content.split('\n')[0] || ''
const commaCount = (firstLine.match(/,/g) || []).length
const tabCount = (firstLine.match(/\t/g) || []).length
const semiCount = (firstLine.match(/;/g) || []).length
if (tabCount > commaCount && tabCount > semiCount) return '\t'
if (semiCount > commaCount) return ';'
return ','
}
export async function parseCsvBuffer(
buffer: Buffer
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
const { parse } = await import('csv-parse/sync')
const parsed = parse(buffer.toString('utf-8'), {
let content = buffer.toString('utf-8')
if (content.charCodeAt(0) === 0xfeff) content = content.slice(1)
const delimiter = detectDelimiter(content)
const parsed = parse(content, {
columns: true,
skip_empty_lines: true,
trim: true,
@@ -100,7 +112,7 @@ function inferSchema(headers: string[], rows: Record<string, unknown>[]): Column
* underscores, and ensures the name starts with a letter or underscore.
* Used for both table names and column names to satisfy NAME_PATTERN.
*/
function sanitizeName(raw: string, fallbackPrefix = 'col'): string {
export function sanitizeName(raw: string, fallbackPrefix = 'col'): string {
let name = raw
.trim()
.replace(/[^a-zA-Z0-9_]/g, '_')
@@ -155,6 +167,86 @@ function coerceRows(
})
}
/**
* Core CSV-to-table import logic. Parses the buffer, sanitizes headers, infers
* schema, creates the table, coerces and inserts all rows. Rolls back the table
* on insertion failure. Used by both the HTTP handler and copilot tools.
*/
export async function importCsvToTable(opts: {
buffer: Buffer
fileName: string
workspaceId: string
userId: string
tableName?: string
description?: string
}): Promise<{
tableId: string
tableName: string
columns: ColumnDefinition[]
rowCount: number
schema: TableSchema
}> {
const requestId = crypto.randomUUID().slice(0, 8)
const { headers, rows } = await parseCsvBuffer(opts.buffer)
const columns = inferSchema(headers, rows)
const headerToColumn = new Map(headers.map((h, i) => [h, columns[i].name]))
const tableName =
opts.tableName || sanitizeName(opts.fileName.replace(/\.[^.]+$/, ''), 'imported_table')
const planLimits = await getWorkspaceTableLimits(opts.workspaceId)
const normalizedSchema: TableSchema = {
columns: columns.map(normalizeColumn),
}
const table = await createTable(
{
name: tableName,
description: opts.description || `Imported from ${opts.fileName}`,
schema: normalizedSchema,
workspaceId: opts.workspaceId,
userId: opts.userId,
maxRows: planLimits.maxRowsPerTable,
maxTables: planLimits.maxTables,
},
requestId
)
try {
const coerced = coerceRows(rows, columns, headerToColumn)
let inserted = 0
for (let i = 0; i < coerced.length; i += MAX_BATCH_SIZE) {
const batch = coerced.slice(i, i + MAX_BATCH_SIZE)
const batchRequestId = crypto.randomUUID().slice(0, 8)
const result = await batchInsertRows(
{ tableId: table.id, rows: batch, workspaceId: opts.workspaceId, userId: opts.userId },
table,
batchRequestId
)
inserted += result.length
}
logger.info('CSV imported to table', {
tableId: table.id,
fileName: opts.fileName,
columns: columns.length,
rows: inserted,
})
return {
tableId: table.id,
tableName: table.name,
columns,
rowCount: inserted,
schema: normalizedSchema,
}
} catch (insertError) {
await deleteTable(table.id, requestId).catch(() => {})
throw insertError
}
}
export async function POST(request: NextRequest) {
const requestId = generateRequestId()
@@ -194,69 +286,25 @@ export async function POST(request: NextRequest) {
}
const buffer = Buffer.from(await file.arrayBuffer())
const delimiter = ext === 'tsv' ? '\t' : ','
const { headers, rows } = await parseCsvBuffer(buffer, delimiter)
const result = await importCsvToTable({
buffer,
fileName: file.name,
workspaceId,
userId: authResult.userId,
})
const columns = inferSchema(headers, rows)
const headerToColumn = new Map(headers.map((h, i) => [h, columns[i].name]))
const tableName = sanitizeName(file.name.replace(/\.[^.]+$/, ''), 'imported_table')
const planLimits = await getWorkspaceTableLimits(workspaceId)
const normalizedSchema: TableSchema = {
columns: columns.map(normalizeColumn),
}
const table = await createTable(
{
name: tableName,
description: `Imported from ${file.name}`,
schema: normalizedSchema,
workspaceId,
userId: authResult.userId,
maxRows: planLimits.maxRowsPerTable,
maxTables: planLimits.maxTables,
},
requestId
)
try {
const coerced = coerceRows(rows, columns, headerToColumn)
let inserted = 0
for (let i = 0; i < coerced.length; i += MAX_BATCH_SIZE) {
const batch = coerced.slice(i, i + MAX_BATCH_SIZE)
const batchRequestId = crypto.randomUUID().slice(0, 8)
const result = await batchInsertRows(
{ tableId: table.id, rows: batch, workspaceId, userId: authResult.userId },
table,
batchRequestId
)
inserted += result.length
}
logger.info(`[${requestId}] CSV imported`, {
tableId: table.id,
fileName: file.name,
columns: columns.length,
rows: inserted,
})
return NextResponse.json({
success: true,
data: {
table: {
id: table.id,
name: table.name,
description: table.description,
schema: normalizedSchema,
rowCount: inserted,
},
return NextResponse.json({
success: true,
data: {
table: {
id: result.tableId,
name: result.tableName,
description: `Imported from ${file.name}`,
schema: result.schema,
rowCount: result.rowCount,
},
})
} catch (insertError) {
await deleteTable(table.id, requestId).catch(() => {})
throw insertError
}
},
})
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
logger.error(`[${requestId}] CSV import failed:`, error)

View File

@@ -147,10 +147,31 @@ export async function buildCopilotRequestPayload(
`Read with: read("uploads/${filename}")`,
`To save permanently: materialize_file(fileName: "${filename}")`,
]
if (filename.endsWith('.json')) {
const lower = filename.toLowerCase()
if (lower.endsWith('.csv') || lower.endsWith('.tsv')) {
lines.push(
`To create a table: materialize_file(fileName: "${filename}", operation: "table")`
)
}
if (lower.endsWith('.json')) {
lines.push(
`To import as a workflow: materialize_file(fileName: "${filename}", operation: "import")`
)
lines.push(
`To create a table (if JSON array): materialize_file(fileName: "${filename}", operation: "table")`
)
}
if (
lower.endsWith('.pdf') ||
lower.endsWith('.txt') ||
lower.endsWith('.md') ||
lower.endsWith('.docx') ||
lower.endsWith('.doc') ||
lower.endsWith('.csv')
) {
lines.push(
`To add to knowledge base: materialize_file(fileName: "${filename}", operation: "knowledge_base")`
)
}
uploadContexts.push({
type: 'uploaded_file',

View File

@@ -348,16 +348,9 @@ async function maybeWriteReadCsvToTable(
}
rows = parsed
} else {
const { parse } = await import('csv-parse/sync')
rows = parse(content, {
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true,
relax_quotes: true,
skip_records_with_error: true,
cast: false,
}) as Record<string, unknown>[]
const { parseCsvBuffer } = await import('@/app/api/table/import-csv/route')
const parsed = await parseCsvBuffer(Buffer.from(content, 'utf-8'))
rows = parsed.rows
}
if (rows.length === 0) {
@@ -610,24 +603,6 @@ export async function executeToolAndReport(
}
}
function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (abortSignal?.aborted) {
resolve()
return
}
const timer = setTimeout(resolve, ms)
abortSignal?.addEventListener(
'abort',
() => {
clearTimeout(timer)
resolve()
},
{ once: true }
)
})
}
export async function waitForToolDecision(
toolCallId: string,
timeoutMs: number,
@@ -642,7 +617,7 @@ export async function waitForToolDecision(
if (decision?.status) {
return decision
}
await abortAwareSleep(interval, abortSignal)
await new Promise((resolve) => setTimeout(resolve, interval))
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
}
return null
@@ -681,7 +656,7 @@ export async function waitForToolCompletion(
) {
return decision
}
await abortAwareSleep(interval, abortSignal)
await new Promise((resolve) => setTimeout(resolve, interval))
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
}
return null

View File

@@ -2,8 +2,14 @@ import { db } from '@sim/db'
import { workflow, workspaceFiles } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { importCsvToTable } from '@/app/api/table/import-csv/route'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
import { getServePathPrefix } from '@/lib/uploads'
import {
createSingleDocument,
processDocumentAsync,
} from '@/lib/knowledge/documents/service'
import { createKnowledgeBase, getKnowledgeBaseById } from '@/lib/knowledge/service'
import { getServePathPrefix, StorageService } from '@/lib/uploads'
import { downloadWorkspaceFile } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
import { parseWorkflowJson } from '@/lib/workflows/operations/import-export'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
@@ -182,6 +188,251 @@ async function executeImport(
}
}
async function executeTable(
fileName: string,
chatId: string,
workspaceId: string,
userId: string,
tableName?: string
): Promise<ToolCallResult> {
const row = await findUploadRecord(fileName, chatId)
if (!row) {
return {
success: false,
error: `Upload not found: "${fileName}". Use glob("uploads/*") to list available uploads.`,
}
}
const buffer = await downloadWorkspaceFile(toFileRecord(row))
const ext = fileName.split('.').pop()?.toLowerCase()
if (ext === 'csv' || ext === 'tsv') {
const result = await importCsvToTable({
buffer,
fileName,
workspaceId,
userId,
tableName,
description: `Imported from uploaded file ${fileName}`,
})
logger.info('Table created from uploaded CSV', {
fileName,
tableId: result.tableId,
tableName: result.tableName,
rows: result.rowCount,
chatId,
})
return {
success: true,
output: {
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${fileName}".`,
tableId: result.tableId,
tableName: result.tableName,
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
rowCount: result.rowCount,
},
resources: [{ type: 'table', id: result.tableId, title: result.tableName }],
}
}
if (ext === 'json') {
const content = buffer.toString('utf-8')
let parsed: unknown
try {
parsed = JSON.parse(content)
} catch {
return { success: false, error: `"${fileName}" is not valid JSON.` }
}
if (!Array.isArray(parsed)) {
return {
success: false,
error: 'JSON file must contain an array of objects for table import.',
}
}
// Write JSON array as CSV into a temporary buffer so importCsvToTable handles
// schema inference, sanitization, and insertion consistently
const rows = parsed as Record<string, unknown>[]
if (rows.length === 0) {
return { success: false, error: 'JSON file contains an empty array.' }
}
const headerSet = new Set<string>()
for (const row of rows) {
if (typeof row !== 'object' || row === null || Array.isArray(row)) {
return { success: false, error: 'Each element in the JSON array must be a plain object.' }
}
for (const key of Object.keys(row)) headerSet.add(key)
}
const headers = [...headerSet]
const csvLines = [headers.join(',')]
for (const row of rows) {
csvLines.push(
headers.map((h) => {
const v = row[h]
if (v === null || v === undefined) return ''
const s = String(v)
return s.includes(',') || s.includes('"') || s.includes('\n')
? `"${s.replace(/"/g, '""')}"`
: s
}).join(',')
)
}
const csvBuffer = Buffer.from(csvLines.join('\n'), 'utf-8')
const result = await importCsvToTable({
buffer: csvBuffer,
fileName,
workspaceId,
userId,
tableName,
description: `Imported from uploaded file ${fileName}`,
})
logger.info('Table created from uploaded JSON', {
fileName,
tableId: result.tableId,
tableName: result.tableName,
rows: result.rowCount,
chatId,
})
return {
success: true,
output: {
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${fileName}".`,
tableId: result.tableId,
tableName: result.tableName,
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
rowCount: result.rowCount,
},
resources: [{ type: 'table', id: result.tableId, title: result.tableName }],
}
}
return {
success: false,
error: `Unsupported file format for table import: "${ext}". Supported: csv, tsv, json`,
}
}
async function executeKnowledgeBase(
fileName: string,
chatId: string,
workspaceId: string,
userId: string,
knowledgeBaseId?: string
): Promise<ToolCallResult> {
const [updated] = await db
.update(workspaceFiles)
.set({ context: 'workspace', chatId: null })
.where(
and(
eq(workspaceFiles.originalName, fileName),
eq(workspaceFiles.chatId, chatId),
eq(workspaceFiles.context, 'mothership'),
isNull(workspaceFiles.deletedAt)
)
)
.returning({
id: workspaceFiles.id,
originalName: workspaceFiles.originalName,
key: workspaceFiles.key,
size: workspaceFiles.size,
contentType: workspaceFiles.contentType,
})
if (!updated) {
return {
success: false,
error: `Upload not found: "${fileName}". Use glob("uploads/*") to list available uploads.`,
}
}
let kbId = knowledgeBaseId
let kbName: string
if (kbId) {
const existing = await getKnowledgeBaseById(kbId)
if (!existing) {
return { success: false, error: `Knowledge base not found: ${kbId}` }
}
kbName = existing.name
} else {
const baseName = fileName.replace(/\.[^.]+$/, '')
const requestId = crypto.randomUUID().slice(0, 8)
const newKb = await createKnowledgeBase(
{
name: baseName,
description: `Created from uploaded file ${fileName}`,
workspaceId,
userId,
embeddingModel: 'text-embedding-3-small',
embeddingDimension: 1536,
chunkingConfig: { maxSize: 1024, minSize: 1, overlap: 200 },
},
requestId
)
kbId = newKb.id
kbName = newKb.name
}
const presignedUrl = await StorageService.generatePresignedDownloadUrl(
updated.key,
'workspace',
5 * 60
)
const requestId = crypto.randomUUID().slice(0, 8)
const doc = await createSingleDocument(
{
filename: updated.originalName,
fileUrl: presignedUrl,
fileSize: updated.size,
mimeType: updated.contentType,
},
kbId,
requestId
)
processDocumentAsync(kbId, doc.id, {
filename: updated.originalName,
fileUrl: presignedUrl,
fileSize: updated.size,
mimeType: updated.contentType,
}, {}).catch((err) => {
logger.error('Background document processing failed', {
documentId: doc.id,
error: err instanceof Error ? err.message : String(err),
})
})
logger.info('File added to knowledge base via materialize_file', {
fileName,
fileId: updated.id,
knowledgeBaseId: kbId,
documentId: doc.id,
chatId,
})
return {
success: true,
output: {
message: `File "${fileName}" saved and added to knowledge base "${kbName}". Document processing started (chunking + embedding).`,
fileId: updated.id,
knowledgeBaseId: kbId,
knowledgeBaseName: kbName,
documentId: doc.id,
},
resources: [
{ type: 'file', id: updated.id, title: fileName },
{ type: 'knowledgebase', id: kbId, title: kbName },
],
}
}
export async function executeMaterializeFile(
params: Record<string, unknown>,
context: ExecutionContext
@@ -205,6 +456,24 @@ export async function executeMaterializeFile(
if (operation === 'import') {
return await executeImport(fileName, context.chatId, context.workspaceId, context.userId)
}
if (operation === 'table') {
return await executeTable(
fileName,
context.chatId,
context.workspaceId,
context.userId,
params.tableName as string | undefined
)
}
if (operation === 'knowledge_base') {
return await executeKnowledgeBase(
fileName,
context.chatId,
context.workspaceId,
context.userId,
params.knowledgeBaseId as string | undefined
)
}
return await executeSave(fileName, context.chatId)
} catch (err) {
logger.error('materialize_file failed', {

View File

@@ -1,4 +1,8 @@
import { createLogger } from '@sim/logger'
import {
importCsvToTable,
sanitizeName,
} from '@/app/api/table/import-csv/route'
import type { BaseServerTool, ServerToolContext } from '@/lib/copilot/tools/server/base-tool'
import type { UserTableArgs, UserTableResult } from '@/lib/copilot/tools/shared/schemas'
import { COLUMN_TYPES } from '@/lib/table/constants'
@@ -55,19 +59,9 @@ async function resolveWorkspaceFile(
return { buffer, name: record.name, type: record.type }
}
function parseFileRows(
buffer: Buffer,
fileName: string,
contentType: string
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
function isCsvFile(fileName: string, contentType: string): boolean {
const ext = fileName.split('.').pop()?.toLowerCase()
if (ext === 'json' || contentType === 'application/json') {
return parseJsonRows(buffer)
}
if (ext === 'csv' || ext === 'tsv' || contentType === 'text/csv') {
return parseCsvRows(buffer)
}
throw new Error(`Unsupported file format: "${ext}". Supported: csv, tsv, json`)
return ext === 'csv' || ext === 'tsv' || contentType === 'text/csv'
}
async function parseJsonRows(
@@ -90,27 +84,37 @@ async function parseJsonRows(
return { headers: [...headerSet], rows: parsed }
}
async function parseCsvRows(
buffer: Buffer
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
const { parse } = await import('csv-parse/sync')
const parsed = parse(buffer.toString('utf-8'), {
columns: true,
skip_empty_lines: true,
trim: true,
relax_column_count: true,
relax_quotes: true,
skip_records_with_error: true,
cast: false,
}) as Record<string, unknown>[]
if (parsed.length === 0) {
throw new Error('CSV file has no data rows')
}
const headers = Object.keys(parsed[0])
if (headers.length === 0) {
throw new Error('CSV file has no headers')
}
return { headers, rows: parsed }
function sanitizeHeadersAndRows(
headers: string[],
rows: Record<string, unknown>[]
): { headers: string[]; rows: Record<string, unknown>[] } {
const seen = new Set<string>()
const headerMap = new Map<string, string>()
const sanitized = headers.map((raw) => {
let clean = sanitizeName(raw)
let suffix = 2
while (seen.has(clean.toLowerCase())) {
clean = `${sanitizeName(raw)}_${suffix}`
suffix++
}
seen.add(clean.toLowerCase())
headerMap.set(raw, clean)
return clean
})
const needsRemap = headers.some((h, i) => h !== sanitized[i])
const remappedRows = needsRemap
? rows.map((row) => {
const out: Record<string, unknown> = {}
for (const [raw, clean] of headerMap) {
if (raw in row) out[clean] = row[raw]
}
return out
})
: rows
return { headers: sanitized, rows: remappedRows }
}
function inferColumnType(values: unknown[]): ColumnType {
@@ -640,13 +644,55 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
}
const file = await resolveWorkspaceFile(filePath, workspaceId)
const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type)
if (rows.length === 0) {
const ext = file.name.split('.').pop()?.toLowerCase()
if (isCsvFile(file.name, file.type)) {
const result = await importCsvToTable({
buffer: file.buffer,
fileName: file.name,
workspaceId,
userId: context.userId,
tableName: args.name,
description: args.description || `Imported from ${file.name}`,
})
logger.info('Table created from CSV file', {
tableId: result.tableId,
fileName: file.name,
columns: result.columns.length,
rows: result.rowCount,
userId: context.userId,
})
return {
success: true,
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${file.name}"`,
data: {
tableId: result.tableId,
tableName: result.tableName,
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
rowCount: result.rowCount,
sourceFile: file.name,
},
}
}
if (ext !== 'json' && file.type !== 'application/json') {
return {
success: false,
message: `Unsupported file format: "${ext}". Supported: csv, tsv, json`,
}
}
const { headers: rawHeaders, rows: rawRows } = await parseJsonRows(file.buffer)
if (rawRows.length === 0) {
return { success: false, message: 'File contains no data rows' }
}
const { headers, rows } = sanitizeHeadersAndRows(rawHeaders, rawRows)
const columns = inferSchema(headers, rows)
const tableName = args.name || file.name.replace(/\.[^.]+$/, '')
const tableName =
args.name || sanitizeName(file.name.replace(/\.[^.]+$/, ''), 'imported_table')
const requestId = crypto.randomUUID().slice(0, 8)
const table = await createTable(
{
@@ -663,7 +709,7 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
const coerced = coerceRows(rows, columns, columnMap)
const inserted = await batchInsertAll(table.id, coerced, table, workspaceId)
logger.info('Table created from file', {
logger.info('Table created from JSON file', {
tableId: table.id,
fileName: file.name,
columns: columns.length,
@@ -703,11 +749,34 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
}
const file = await resolveWorkspaceFile(filePath, workspaceId)
const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type)
if (rows.length === 0) {
const ext = file.name.split('.').pop()?.toLowerCase()
const isJson = ext === 'json' || file.type === 'application/json'
const isCsv = isCsvFile(file.name, file.type)
if (!isJson && !isCsv) {
return {
success: false,
message: `Unsupported file format: "${ext}". Supported: csv, tsv, json`,
}
}
let rawHeaders: string[]
let rawRows: Record<string, unknown>[]
if (isJson) {
const parsed = await parseJsonRows(file.buffer)
rawHeaders = parsed.headers
rawRows = parsed.rows
} else {
const { parseCsvBuffer } = await import('@/app/api/table/import-csv/route')
const parsed = await parseCsvBuffer(file.buffer)
rawHeaders = parsed.headers
rawRows = parsed.rows
}
if (rawRows.length === 0) {
return { success: false, message: 'File contains no data rows' }
}
const { headers, rows } = sanitizeHeadersAndRows(rawHeaders, rawRows)
const tableColumns = table.schema.columns as ColumnDefinition[]
const tableColNames = new Set(tableColumns.map((c) => c.name))
const mappedHeaders = headers.filter((h) => tableColNames.has(h))