mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Compare commits
1 Commits
dev
...
fix/mother
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
539900e672 |
@@ -21,12 +21,24 @@ const SCHEMA_SAMPLE_SIZE = 100
|
||||
|
||||
type ColumnType = 'string' | 'number' | 'boolean' | 'date'
|
||||
|
||||
async function parseCsvBuffer(
|
||||
buffer: Buffer,
|
||||
delimiter = ','
|
||||
function detectDelimiter(content: string): string {
|
||||
const firstLine = content.split('\n')[0] || ''
|
||||
const commaCount = (firstLine.match(/,/g) || []).length
|
||||
const tabCount = (firstLine.match(/\t/g) || []).length
|
||||
const semiCount = (firstLine.match(/;/g) || []).length
|
||||
if (tabCount > commaCount && tabCount > semiCount) return '\t'
|
||||
if (semiCount > commaCount) return ';'
|
||||
return ','
|
||||
}
|
||||
|
||||
export async function parseCsvBuffer(
|
||||
buffer: Buffer
|
||||
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
|
||||
const { parse } = await import('csv-parse/sync')
|
||||
const parsed = parse(buffer.toString('utf-8'), {
|
||||
let content = buffer.toString('utf-8')
|
||||
if (content.charCodeAt(0) === 0xfeff) content = content.slice(1)
|
||||
const delimiter = detectDelimiter(content)
|
||||
const parsed = parse(content, {
|
||||
columns: true,
|
||||
skip_empty_lines: true,
|
||||
trim: true,
|
||||
@@ -100,7 +112,7 @@ function inferSchema(headers: string[], rows: Record<string, unknown>[]): Column
|
||||
* underscores, and ensures the name starts with a letter or underscore.
|
||||
* Used for both table names and column names to satisfy NAME_PATTERN.
|
||||
*/
|
||||
function sanitizeName(raw: string, fallbackPrefix = 'col'): string {
|
||||
export function sanitizeName(raw: string, fallbackPrefix = 'col'): string {
|
||||
let name = raw
|
||||
.trim()
|
||||
.replace(/[^a-zA-Z0-9_]/g, '_')
|
||||
@@ -155,6 +167,86 @@ function coerceRows(
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Core CSV-to-table import logic. Parses the buffer, sanitizes headers, infers
|
||||
* schema, creates the table, coerces and inserts all rows. Rolls back the table
|
||||
* on insertion failure. Used by both the HTTP handler and copilot tools.
|
||||
*/
|
||||
export async function importCsvToTable(opts: {
|
||||
buffer: Buffer
|
||||
fileName: string
|
||||
workspaceId: string
|
||||
userId: string
|
||||
tableName?: string
|
||||
description?: string
|
||||
}): Promise<{
|
||||
tableId: string
|
||||
tableName: string
|
||||
columns: ColumnDefinition[]
|
||||
rowCount: number
|
||||
schema: TableSchema
|
||||
}> {
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const { headers, rows } = await parseCsvBuffer(opts.buffer)
|
||||
|
||||
const columns = inferSchema(headers, rows)
|
||||
const headerToColumn = new Map(headers.map((h, i) => [h, columns[i].name]))
|
||||
|
||||
const tableName =
|
||||
opts.tableName || sanitizeName(opts.fileName.replace(/\.[^.]+$/, ''), 'imported_table')
|
||||
const planLimits = await getWorkspaceTableLimits(opts.workspaceId)
|
||||
|
||||
const normalizedSchema: TableSchema = {
|
||||
columns: columns.map(normalizeColumn),
|
||||
}
|
||||
|
||||
const table = await createTable(
|
||||
{
|
||||
name: tableName,
|
||||
description: opts.description || `Imported from ${opts.fileName}`,
|
||||
schema: normalizedSchema,
|
||||
workspaceId: opts.workspaceId,
|
||||
userId: opts.userId,
|
||||
maxRows: planLimits.maxRowsPerTable,
|
||||
maxTables: planLimits.maxTables,
|
||||
},
|
||||
requestId
|
||||
)
|
||||
|
||||
try {
|
||||
const coerced = coerceRows(rows, columns, headerToColumn)
|
||||
let inserted = 0
|
||||
for (let i = 0; i < coerced.length; i += MAX_BATCH_SIZE) {
|
||||
const batch = coerced.slice(i, i + MAX_BATCH_SIZE)
|
||||
const batchRequestId = crypto.randomUUID().slice(0, 8)
|
||||
const result = await batchInsertRows(
|
||||
{ tableId: table.id, rows: batch, workspaceId: opts.workspaceId, userId: opts.userId },
|
||||
table,
|
||||
batchRequestId
|
||||
)
|
||||
inserted += result.length
|
||||
}
|
||||
|
||||
logger.info('CSV imported to table', {
|
||||
tableId: table.id,
|
||||
fileName: opts.fileName,
|
||||
columns: columns.length,
|
||||
rows: inserted,
|
||||
})
|
||||
|
||||
return {
|
||||
tableId: table.id,
|
||||
tableName: table.name,
|
||||
columns,
|
||||
rowCount: inserted,
|
||||
schema: normalizedSchema,
|
||||
}
|
||||
} catch (insertError) {
|
||||
await deleteTable(table.id, requestId).catch(() => {})
|
||||
throw insertError
|
||||
}
|
||||
}
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
const requestId = generateRequestId()
|
||||
|
||||
@@ -194,69 +286,25 @@ export async function POST(request: NextRequest) {
|
||||
}
|
||||
|
||||
const buffer = Buffer.from(await file.arrayBuffer())
|
||||
const delimiter = ext === 'tsv' ? '\t' : ','
|
||||
const { headers, rows } = await parseCsvBuffer(buffer, delimiter)
|
||||
const result = await importCsvToTable({
|
||||
buffer,
|
||||
fileName: file.name,
|
||||
workspaceId,
|
||||
userId: authResult.userId,
|
||||
})
|
||||
|
||||
const columns = inferSchema(headers, rows)
|
||||
const headerToColumn = new Map(headers.map((h, i) => [h, columns[i].name]))
|
||||
|
||||
const tableName = sanitizeName(file.name.replace(/\.[^.]+$/, ''), 'imported_table')
|
||||
const planLimits = await getWorkspaceTableLimits(workspaceId)
|
||||
|
||||
const normalizedSchema: TableSchema = {
|
||||
columns: columns.map(normalizeColumn),
|
||||
}
|
||||
|
||||
const table = await createTable(
|
||||
{
|
||||
name: tableName,
|
||||
description: `Imported from ${file.name}`,
|
||||
schema: normalizedSchema,
|
||||
workspaceId,
|
||||
userId: authResult.userId,
|
||||
maxRows: planLimits.maxRowsPerTable,
|
||||
maxTables: planLimits.maxTables,
|
||||
},
|
||||
requestId
|
||||
)
|
||||
|
||||
try {
|
||||
const coerced = coerceRows(rows, columns, headerToColumn)
|
||||
let inserted = 0
|
||||
for (let i = 0; i < coerced.length; i += MAX_BATCH_SIZE) {
|
||||
const batch = coerced.slice(i, i + MAX_BATCH_SIZE)
|
||||
const batchRequestId = crypto.randomUUID().slice(0, 8)
|
||||
const result = await batchInsertRows(
|
||||
{ tableId: table.id, rows: batch, workspaceId, userId: authResult.userId },
|
||||
table,
|
||||
batchRequestId
|
||||
)
|
||||
inserted += result.length
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] CSV imported`, {
|
||||
tableId: table.id,
|
||||
fileName: file.name,
|
||||
columns: columns.length,
|
||||
rows: inserted,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
data: {
|
||||
table: {
|
||||
id: table.id,
|
||||
name: table.name,
|
||||
description: table.description,
|
||||
schema: normalizedSchema,
|
||||
rowCount: inserted,
|
||||
},
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
data: {
|
||||
table: {
|
||||
id: result.tableId,
|
||||
name: result.tableName,
|
||||
description: `Imported from ${file.name}`,
|
||||
schema: result.schema,
|
||||
rowCount: result.rowCount,
|
||||
},
|
||||
})
|
||||
} catch (insertError) {
|
||||
await deleteTable(table.id, requestId).catch(() => {})
|
||||
throw insertError
|
||||
}
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error)
|
||||
logger.error(`[${requestId}] CSV import failed:`, error)
|
||||
|
||||
@@ -147,10 +147,31 @@ export async function buildCopilotRequestPayload(
|
||||
`Read with: read("uploads/${filename}")`,
|
||||
`To save permanently: materialize_file(fileName: "${filename}")`,
|
||||
]
|
||||
if (filename.endsWith('.json')) {
|
||||
const lower = filename.toLowerCase()
|
||||
if (lower.endsWith('.csv') || lower.endsWith('.tsv')) {
|
||||
lines.push(
|
||||
`To create a table: materialize_file(fileName: "${filename}", operation: "table")`
|
||||
)
|
||||
}
|
||||
if (lower.endsWith('.json')) {
|
||||
lines.push(
|
||||
`To import as a workflow: materialize_file(fileName: "${filename}", operation: "import")`
|
||||
)
|
||||
lines.push(
|
||||
`To create a table (if JSON array): materialize_file(fileName: "${filename}", operation: "table")`
|
||||
)
|
||||
}
|
||||
if (
|
||||
lower.endsWith('.pdf') ||
|
||||
lower.endsWith('.txt') ||
|
||||
lower.endsWith('.md') ||
|
||||
lower.endsWith('.docx') ||
|
||||
lower.endsWith('.doc') ||
|
||||
lower.endsWith('.csv')
|
||||
) {
|
||||
lines.push(
|
||||
`To add to knowledge base: materialize_file(fileName: "${filename}", operation: "knowledge_base")`
|
||||
)
|
||||
}
|
||||
uploadContexts.push({
|
||||
type: 'uploaded_file',
|
||||
|
||||
@@ -348,16 +348,9 @@ async function maybeWriteReadCsvToTable(
|
||||
}
|
||||
rows = parsed
|
||||
} else {
|
||||
const { parse } = await import('csv-parse/sync')
|
||||
rows = parse(content, {
|
||||
columns: true,
|
||||
skip_empty_lines: true,
|
||||
trim: true,
|
||||
relax_column_count: true,
|
||||
relax_quotes: true,
|
||||
skip_records_with_error: true,
|
||||
cast: false,
|
||||
}) as Record<string, unknown>[]
|
||||
const { parseCsvBuffer } = await import('@/app/api/table/import-csv/route')
|
||||
const parsed = await parseCsvBuffer(Buffer.from(content, 'utf-8'))
|
||||
rows = parsed.rows
|
||||
}
|
||||
|
||||
if (rows.length === 0) {
|
||||
@@ -610,24 +603,6 @@ export async function executeToolAndReport(
|
||||
}
|
||||
}
|
||||
|
||||
function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
if (abortSignal?.aborted) {
|
||||
resolve()
|
||||
return
|
||||
}
|
||||
const timer = setTimeout(resolve, ms)
|
||||
abortSignal?.addEventListener(
|
||||
'abort',
|
||||
() => {
|
||||
clearTimeout(timer)
|
||||
resolve()
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
export async function waitForToolDecision(
|
||||
toolCallId: string,
|
||||
timeoutMs: number,
|
||||
@@ -642,7 +617,7 @@ export async function waitForToolDecision(
|
||||
if (decision?.status) {
|
||||
return decision
|
||||
}
|
||||
await abortAwareSleep(interval, abortSignal)
|
||||
await new Promise((resolve) => setTimeout(resolve, interval))
|
||||
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
|
||||
}
|
||||
return null
|
||||
@@ -681,7 +656,7 @@ export async function waitForToolCompletion(
|
||||
) {
|
||||
return decision
|
||||
}
|
||||
await abortAwareSleep(interval, abortSignal)
|
||||
await new Promise((resolve) => setTimeout(resolve, interval))
|
||||
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
|
||||
}
|
||||
return null
|
||||
|
||||
@@ -2,8 +2,14 @@ import { db } from '@sim/db'
|
||||
import { workflow, workspaceFiles } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, isNull } from 'drizzle-orm'
|
||||
import { importCsvToTable } from '@/app/api/table/import-csv/route'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { getServePathPrefix } from '@/lib/uploads'
|
||||
import {
|
||||
createSingleDocument,
|
||||
processDocumentAsync,
|
||||
} from '@/lib/knowledge/documents/service'
|
||||
import { createKnowledgeBase, getKnowledgeBaseById } from '@/lib/knowledge/service'
|
||||
import { getServePathPrefix, StorageService } from '@/lib/uploads'
|
||||
import { downloadWorkspaceFile } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
|
||||
import { parseWorkflowJson } from '@/lib/workflows/operations/import-export'
|
||||
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
@@ -182,6 +188,251 @@ async function executeImport(
|
||||
}
|
||||
}
|
||||
|
||||
async function executeTable(
|
||||
fileName: string,
|
||||
chatId: string,
|
||||
workspaceId: string,
|
||||
userId: string,
|
||||
tableName?: string
|
||||
): Promise<ToolCallResult> {
|
||||
const row = await findUploadRecord(fileName, chatId)
|
||||
if (!row) {
|
||||
return {
|
||||
success: false,
|
||||
error: `Upload not found: "${fileName}". Use glob("uploads/*") to list available uploads.`,
|
||||
}
|
||||
}
|
||||
|
||||
const buffer = await downloadWorkspaceFile(toFileRecord(row))
|
||||
const ext = fileName.split('.').pop()?.toLowerCase()
|
||||
|
||||
if (ext === 'csv' || ext === 'tsv') {
|
||||
const result = await importCsvToTable({
|
||||
buffer,
|
||||
fileName,
|
||||
workspaceId,
|
||||
userId,
|
||||
tableName,
|
||||
description: `Imported from uploaded file ${fileName}`,
|
||||
})
|
||||
|
||||
logger.info('Table created from uploaded CSV', {
|
||||
fileName,
|
||||
tableId: result.tableId,
|
||||
tableName: result.tableName,
|
||||
rows: result.rowCount,
|
||||
chatId,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${fileName}".`,
|
||||
tableId: result.tableId,
|
||||
tableName: result.tableName,
|
||||
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
|
||||
rowCount: result.rowCount,
|
||||
},
|
||||
resources: [{ type: 'table', id: result.tableId, title: result.tableName }],
|
||||
}
|
||||
}
|
||||
|
||||
if (ext === 'json') {
|
||||
const content = buffer.toString('utf-8')
|
||||
let parsed: unknown
|
||||
try {
|
||||
parsed = JSON.parse(content)
|
||||
} catch {
|
||||
return { success: false, error: `"${fileName}" is not valid JSON.` }
|
||||
}
|
||||
|
||||
if (!Array.isArray(parsed)) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'JSON file must contain an array of objects for table import.',
|
||||
}
|
||||
}
|
||||
|
||||
// Write JSON array as CSV into a temporary buffer so importCsvToTable handles
|
||||
// schema inference, sanitization, and insertion consistently
|
||||
const rows = parsed as Record<string, unknown>[]
|
||||
if (rows.length === 0) {
|
||||
return { success: false, error: 'JSON file contains an empty array.' }
|
||||
}
|
||||
const headerSet = new Set<string>()
|
||||
for (const row of rows) {
|
||||
if (typeof row !== 'object' || row === null || Array.isArray(row)) {
|
||||
return { success: false, error: 'Each element in the JSON array must be a plain object.' }
|
||||
}
|
||||
for (const key of Object.keys(row)) headerSet.add(key)
|
||||
}
|
||||
const headers = [...headerSet]
|
||||
const csvLines = [headers.join(',')]
|
||||
for (const row of rows) {
|
||||
csvLines.push(
|
||||
headers.map((h) => {
|
||||
const v = row[h]
|
||||
if (v === null || v === undefined) return ''
|
||||
const s = String(v)
|
||||
return s.includes(',') || s.includes('"') || s.includes('\n')
|
||||
? `"${s.replace(/"/g, '""')}"`
|
||||
: s
|
||||
}).join(',')
|
||||
)
|
||||
}
|
||||
const csvBuffer = Buffer.from(csvLines.join('\n'), 'utf-8')
|
||||
|
||||
const result = await importCsvToTable({
|
||||
buffer: csvBuffer,
|
||||
fileName,
|
||||
workspaceId,
|
||||
userId,
|
||||
tableName,
|
||||
description: `Imported from uploaded file ${fileName}`,
|
||||
})
|
||||
|
||||
logger.info('Table created from uploaded JSON', {
|
||||
fileName,
|
||||
tableId: result.tableId,
|
||||
tableName: result.tableName,
|
||||
rows: result.rowCount,
|
||||
chatId,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${fileName}".`,
|
||||
tableId: result.tableId,
|
||||
tableName: result.tableName,
|
||||
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
|
||||
rowCount: result.rowCount,
|
||||
},
|
||||
resources: [{ type: 'table', id: result.tableId, title: result.tableName }],
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: `Unsupported file format for table import: "${ext}". Supported: csv, tsv, json`,
|
||||
}
|
||||
}
|
||||
|
||||
async function executeKnowledgeBase(
|
||||
fileName: string,
|
||||
chatId: string,
|
||||
workspaceId: string,
|
||||
userId: string,
|
||||
knowledgeBaseId?: string
|
||||
): Promise<ToolCallResult> {
|
||||
const [updated] = await db
|
||||
.update(workspaceFiles)
|
||||
.set({ context: 'workspace', chatId: null })
|
||||
.where(
|
||||
and(
|
||||
eq(workspaceFiles.originalName, fileName),
|
||||
eq(workspaceFiles.chatId, chatId),
|
||||
eq(workspaceFiles.context, 'mothership'),
|
||||
isNull(workspaceFiles.deletedAt)
|
||||
)
|
||||
)
|
||||
.returning({
|
||||
id: workspaceFiles.id,
|
||||
originalName: workspaceFiles.originalName,
|
||||
key: workspaceFiles.key,
|
||||
size: workspaceFiles.size,
|
||||
contentType: workspaceFiles.contentType,
|
||||
})
|
||||
|
||||
if (!updated) {
|
||||
return {
|
||||
success: false,
|
||||
error: `Upload not found: "${fileName}". Use glob("uploads/*") to list available uploads.`,
|
||||
}
|
||||
}
|
||||
|
||||
let kbId = knowledgeBaseId
|
||||
let kbName: string
|
||||
|
||||
if (kbId) {
|
||||
const existing = await getKnowledgeBaseById(kbId)
|
||||
if (!existing) {
|
||||
return { success: false, error: `Knowledge base not found: ${kbId}` }
|
||||
}
|
||||
kbName = existing.name
|
||||
} else {
|
||||
const baseName = fileName.replace(/\.[^.]+$/, '')
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const newKb = await createKnowledgeBase(
|
||||
{
|
||||
name: baseName,
|
||||
description: `Created from uploaded file ${fileName}`,
|
||||
workspaceId,
|
||||
userId,
|
||||
embeddingModel: 'text-embedding-3-small',
|
||||
embeddingDimension: 1536,
|
||||
chunkingConfig: { maxSize: 1024, minSize: 1, overlap: 200 },
|
||||
},
|
||||
requestId
|
||||
)
|
||||
kbId = newKb.id
|
||||
kbName = newKb.name
|
||||
}
|
||||
|
||||
const presignedUrl = await StorageService.generatePresignedDownloadUrl(
|
||||
updated.key,
|
||||
'workspace',
|
||||
5 * 60
|
||||
)
|
||||
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const doc = await createSingleDocument(
|
||||
{
|
||||
filename: updated.originalName,
|
||||
fileUrl: presignedUrl,
|
||||
fileSize: updated.size,
|
||||
mimeType: updated.contentType,
|
||||
},
|
||||
kbId,
|
||||
requestId
|
||||
)
|
||||
|
||||
processDocumentAsync(kbId, doc.id, {
|
||||
filename: updated.originalName,
|
||||
fileUrl: presignedUrl,
|
||||
fileSize: updated.size,
|
||||
mimeType: updated.contentType,
|
||||
}, {}).catch((err) => {
|
||||
logger.error('Background document processing failed', {
|
||||
documentId: doc.id,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
})
|
||||
|
||||
logger.info('File added to knowledge base via materialize_file', {
|
||||
fileName,
|
||||
fileId: updated.id,
|
||||
knowledgeBaseId: kbId,
|
||||
documentId: doc.id,
|
||||
chatId,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
message: `File "${fileName}" saved and added to knowledge base "${kbName}". Document processing started (chunking + embedding).`,
|
||||
fileId: updated.id,
|
||||
knowledgeBaseId: kbId,
|
||||
knowledgeBaseName: kbName,
|
||||
documentId: doc.id,
|
||||
},
|
||||
resources: [
|
||||
{ type: 'file', id: updated.id, title: fileName },
|
||||
{ type: 'knowledgebase', id: kbId, title: kbName },
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeMaterializeFile(
|
||||
params: Record<string, unknown>,
|
||||
context: ExecutionContext
|
||||
@@ -205,6 +456,24 @@ export async function executeMaterializeFile(
|
||||
if (operation === 'import') {
|
||||
return await executeImport(fileName, context.chatId, context.workspaceId, context.userId)
|
||||
}
|
||||
if (operation === 'table') {
|
||||
return await executeTable(
|
||||
fileName,
|
||||
context.chatId,
|
||||
context.workspaceId,
|
||||
context.userId,
|
||||
params.tableName as string | undefined
|
||||
)
|
||||
}
|
||||
if (operation === 'knowledge_base') {
|
||||
return await executeKnowledgeBase(
|
||||
fileName,
|
||||
context.chatId,
|
||||
context.workspaceId,
|
||||
context.userId,
|
||||
params.knowledgeBaseId as string | undefined
|
||||
)
|
||||
}
|
||||
return await executeSave(fileName, context.chatId)
|
||||
} catch (err) {
|
||||
logger.error('materialize_file failed', {
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import {
|
||||
importCsvToTable,
|
||||
sanitizeName,
|
||||
} from '@/app/api/table/import-csv/route'
|
||||
import type { BaseServerTool, ServerToolContext } from '@/lib/copilot/tools/server/base-tool'
|
||||
import type { UserTableArgs, UserTableResult } from '@/lib/copilot/tools/shared/schemas'
|
||||
import { COLUMN_TYPES } from '@/lib/table/constants'
|
||||
@@ -55,19 +59,9 @@ async function resolveWorkspaceFile(
|
||||
return { buffer, name: record.name, type: record.type }
|
||||
}
|
||||
|
||||
function parseFileRows(
|
||||
buffer: Buffer,
|
||||
fileName: string,
|
||||
contentType: string
|
||||
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
|
||||
function isCsvFile(fileName: string, contentType: string): boolean {
|
||||
const ext = fileName.split('.').pop()?.toLowerCase()
|
||||
if (ext === 'json' || contentType === 'application/json') {
|
||||
return parseJsonRows(buffer)
|
||||
}
|
||||
if (ext === 'csv' || ext === 'tsv' || contentType === 'text/csv') {
|
||||
return parseCsvRows(buffer)
|
||||
}
|
||||
throw new Error(`Unsupported file format: "${ext}". Supported: csv, tsv, json`)
|
||||
return ext === 'csv' || ext === 'tsv' || contentType === 'text/csv'
|
||||
}
|
||||
|
||||
async function parseJsonRows(
|
||||
@@ -90,27 +84,37 @@ async function parseJsonRows(
|
||||
return { headers: [...headerSet], rows: parsed }
|
||||
}
|
||||
|
||||
async function parseCsvRows(
|
||||
buffer: Buffer
|
||||
): Promise<{ headers: string[]; rows: Record<string, unknown>[] }> {
|
||||
const { parse } = await import('csv-parse/sync')
|
||||
const parsed = parse(buffer.toString('utf-8'), {
|
||||
columns: true,
|
||||
skip_empty_lines: true,
|
||||
trim: true,
|
||||
relax_column_count: true,
|
||||
relax_quotes: true,
|
||||
skip_records_with_error: true,
|
||||
cast: false,
|
||||
}) as Record<string, unknown>[]
|
||||
if (parsed.length === 0) {
|
||||
throw new Error('CSV file has no data rows')
|
||||
}
|
||||
const headers = Object.keys(parsed[0])
|
||||
if (headers.length === 0) {
|
||||
throw new Error('CSV file has no headers')
|
||||
}
|
||||
return { headers, rows: parsed }
|
||||
function sanitizeHeadersAndRows(
|
||||
headers: string[],
|
||||
rows: Record<string, unknown>[]
|
||||
): { headers: string[]; rows: Record<string, unknown>[] } {
|
||||
const seen = new Set<string>()
|
||||
const headerMap = new Map<string, string>()
|
||||
|
||||
const sanitized = headers.map((raw) => {
|
||||
let clean = sanitizeName(raw)
|
||||
let suffix = 2
|
||||
while (seen.has(clean.toLowerCase())) {
|
||||
clean = `${sanitizeName(raw)}_${suffix}`
|
||||
suffix++
|
||||
}
|
||||
seen.add(clean.toLowerCase())
|
||||
headerMap.set(raw, clean)
|
||||
return clean
|
||||
})
|
||||
|
||||
const needsRemap = headers.some((h, i) => h !== sanitized[i])
|
||||
const remappedRows = needsRemap
|
||||
? rows.map((row) => {
|
||||
const out: Record<string, unknown> = {}
|
||||
for (const [raw, clean] of headerMap) {
|
||||
if (raw in row) out[clean] = row[raw]
|
||||
}
|
||||
return out
|
||||
})
|
||||
: rows
|
||||
|
||||
return { headers: sanitized, rows: remappedRows }
|
||||
}
|
||||
|
||||
function inferColumnType(values: unknown[]): ColumnType {
|
||||
@@ -640,13 +644,55 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
}
|
||||
|
||||
const file = await resolveWorkspaceFile(filePath, workspaceId)
|
||||
const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type)
|
||||
if (rows.length === 0) {
|
||||
const ext = file.name.split('.').pop()?.toLowerCase()
|
||||
|
||||
if (isCsvFile(file.name, file.type)) {
|
||||
const result = await importCsvToTable({
|
||||
buffer: file.buffer,
|
||||
fileName: file.name,
|
||||
workspaceId,
|
||||
userId: context.userId,
|
||||
tableName: args.name,
|
||||
description: args.description || `Imported from ${file.name}`,
|
||||
})
|
||||
|
||||
logger.info('Table created from CSV file', {
|
||||
tableId: result.tableId,
|
||||
fileName: file.name,
|
||||
columns: result.columns.length,
|
||||
rows: result.rowCount,
|
||||
userId: context.userId,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: `Created table "${result.tableName}" with ${result.columns.length} columns and ${result.rowCount} rows from "${file.name}"`,
|
||||
data: {
|
||||
tableId: result.tableId,
|
||||
tableName: result.tableName,
|
||||
columns: result.columns.map((c) => ({ name: c.name, type: c.type })),
|
||||
rowCount: result.rowCount,
|
||||
sourceFile: file.name,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (ext !== 'json' && file.type !== 'application/json') {
|
||||
return {
|
||||
success: false,
|
||||
message: `Unsupported file format: "${ext}". Supported: csv, tsv, json`,
|
||||
}
|
||||
}
|
||||
|
||||
const { headers: rawHeaders, rows: rawRows } = await parseJsonRows(file.buffer)
|
||||
if (rawRows.length === 0) {
|
||||
return { success: false, message: 'File contains no data rows' }
|
||||
}
|
||||
|
||||
const { headers, rows } = sanitizeHeadersAndRows(rawHeaders, rawRows)
|
||||
const columns = inferSchema(headers, rows)
|
||||
const tableName = args.name || file.name.replace(/\.[^.]+$/, '')
|
||||
const tableName =
|
||||
args.name || sanitizeName(file.name.replace(/\.[^.]+$/, ''), 'imported_table')
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const table = await createTable(
|
||||
{
|
||||
@@ -663,7 +709,7 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
const coerced = coerceRows(rows, columns, columnMap)
|
||||
const inserted = await batchInsertAll(table.id, coerced, table, workspaceId)
|
||||
|
||||
logger.info('Table created from file', {
|
||||
logger.info('Table created from JSON file', {
|
||||
tableId: table.id,
|
||||
fileName: file.name,
|
||||
columns: columns.length,
|
||||
@@ -703,11 +749,34 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
}
|
||||
|
||||
const file = await resolveWorkspaceFile(filePath, workspaceId)
|
||||
const { headers, rows } = await parseFileRows(file.buffer, file.name, file.type)
|
||||
if (rows.length === 0) {
|
||||
const ext = file.name.split('.').pop()?.toLowerCase()
|
||||
const isJson = ext === 'json' || file.type === 'application/json'
|
||||
const isCsv = isCsvFile(file.name, file.type)
|
||||
if (!isJson && !isCsv) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Unsupported file format: "${ext}". Supported: csv, tsv, json`,
|
||||
}
|
||||
}
|
||||
|
||||
let rawHeaders: string[]
|
||||
let rawRows: Record<string, unknown>[]
|
||||
if (isJson) {
|
||||
const parsed = await parseJsonRows(file.buffer)
|
||||
rawHeaders = parsed.headers
|
||||
rawRows = parsed.rows
|
||||
} else {
|
||||
const { parseCsvBuffer } = await import('@/app/api/table/import-csv/route')
|
||||
const parsed = await parseCsvBuffer(file.buffer)
|
||||
rawHeaders = parsed.headers
|
||||
rawRows = parsed.rows
|
||||
}
|
||||
if (rawRows.length === 0) {
|
||||
return { success: false, message: 'File contains no data rows' }
|
||||
}
|
||||
|
||||
const { headers, rows } = sanitizeHeadersAndRows(rawHeaders, rawRows)
|
||||
|
||||
const tableColumns = table.schema.columns as ColumnDefinition[]
|
||||
const tableColNames = new Set(tableColumns.map((c) => c.name))
|
||||
const mappedHeaders = headers.filter((h) => tableColNames.has(h))
|
||||
|
||||
Reference in New Issue
Block a user