mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Table batch ops
This commit is contained in:
@@ -4,11 +4,13 @@ import type { UserTableArgs, UserTableResult } from '@/lib/copilot/tools/shared/
|
||||
import {
|
||||
addTableColumn,
|
||||
batchInsertRows,
|
||||
batchUpdateRows,
|
||||
createTable,
|
||||
deleteColumn,
|
||||
deleteColumns,
|
||||
deleteRow,
|
||||
deleteRowsByFilter,
|
||||
deleteRowsByIds,
|
||||
deleteTable,
|
||||
getRowById,
|
||||
getTableById,
|
||||
@@ -518,6 +520,105 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
}
|
||||
}
|
||||
|
||||
case 'batch_update_rows': {
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
|
||||
const rawUpdates = (args as Record<string, unknown>).updates as
|
||||
| Array<{ rowId: string; data: Record<string, unknown> }>
|
||||
| undefined
|
||||
const columnName = (args as Record<string, unknown>).columnName as string | undefined
|
||||
const valuesMap = (args as Record<string, unknown>).values as
|
||||
| Record<string, unknown>
|
||||
| undefined
|
||||
|
||||
let updates: Array<{ rowId: string; data: Record<string, unknown> }>
|
||||
|
||||
if (rawUpdates && rawUpdates.length > 0) {
|
||||
updates = rawUpdates
|
||||
} else if (columnName && valuesMap) {
|
||||
updates = Object.entries(valuesMap).map(([rowId, value]) => ({
|
||||
rowId,
|
||||
data: { [columnName]: value },
|
||||
}))
|
||||
} else {
|
||||
return {
|
||||
success: false,
|
||||
message:
|
||||
'Provide either "updates" array or "columnName" + "values" map',
|
||||
}
|
||||
}
|
||||
|
||||
if (updates.length > MAX_BATCH_SIZE) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Too many updates (${updates.length}). Maximum is ${MAX_BATCH_SIZE}.`,
|
||||
}
|
||||
}
|
||||
|
||||
const table = await getTableById(args.tableId)
|
||||
if (!table) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const result = await batchUpdateRows(
|
||||
{
|
||||
tableId: args.tableId,
|
||||
updates: updates as Array<{ rowId: string; data: RowData }>,
|
||||
workspaceId,
|
||||
},
|
||||
table,
|
||||
requestId
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: `Updated ${result.affectedCount} rows`,
|
||||
data: { affectedCount: result.affectedCount, affectedRowIds: result.affectedRowIds },
|
||||
}
|
||||
}
|
||||
|
||||
case 'batch_delete_rows': {
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
|
||||
const rowIds = (args as Record<string, unknown>).rowIds as string[] | undefined
|
||||
if (!rowIds || rowIds.length === 0) {
|
||||
return { success: false, message: 'rowIds array is required' }
|
||||
}
|
||||
|
||||
if (rowIds.length > MAX_BATCH_SIZE) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Too many row IDs (${rowIds.length}). Maximum is ${MAX_BATCH_SIZE}.`,
|
||||
}
|
||||
}
|
||||
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
const result = await deleteRowsByIds(
|
||||
{ tableId: args.tableId, rowIds, workspaceId },
|
||||
requestId
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: `Deleted ${result.deletedCount} rows`,
|
||||
data: {
|
||||
deletedCount: result.deletedCount,
|
||||
deletedRowIds: result.deletedRowIds,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
case 'create_from_file': {
|
||||
const filePath = (args as Record<string, unknown>).filePath as string | undefined
|
||||
if (!filePath) {
|
||||
|
||||
@@ -121,6 +121,8 @@ export const UserTableArgsSchema = z.object({
|
||||
'delete_row',
|
||||
'update_rows_by_filter',
|
||||
'delete_rows_by_filter',
|
||||
'batch_update_rows',
|
||||
'batch_delete_rows',
|
||||
'add_column',
|
||||
'rename_column',
|
||||
'delete_column',
|
||||
@@ -135,6 +137,11 @@ export const UserTableArgsSchema = z.object({
|
||||
rowId: z.string().optional(),
|
||||
data: z.record(z.any()).optional(),
|
||||
rows: z.array(z.record(z.any())).optional(),
|
||||
updates: z
|
||||
.array(z.object({ rowId: z.string(), data: z.record(z.any()) }))
|
||||
.optional(),
|
||||
rowIds: z.array(z.string()).optional(),
|
||||
values: z.record(z.any()).optional(),
|
||||
filter: z.any().optional(),
|
||||
sort: z.record(z.enum(['asc', 'desc'])).optional(),
|
||||
limit: z.number().optional(),
|
||||
|
||||
@@ -10,11 +10,12 @@
|
||||
import { db } from '@sim/db'
|
||||
import { userTableDefinitions, userTableRows } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, count, eq, gt, gte, sql } from 'drizzle-orm'
|
||||
import { and, count, eq, gt, gte, inArray, sql } from 'drizzle-orm'
|
||||
import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants'
|
||||
import { buildFilterClause, buildSortClause } from './sql'
|
||||
import type {
|
||||
BatchInsertData,
|
||||
BatchUpdateByIdData,
|
||||
BulkDeleteByIdsData,
|
||||
BulkDeleteByIdsResult,
|
||||
BulkDeleteData,
|
||||
@@ -1092,6 +1093,96 @@ export async function updateRowsByFilter(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates multiple rows with per-row data in a single transaction.
|
||||
* Avoids the race condition of parallel update_row calls overwriting each other.
|
||||
*/
|
||||
export async function batchUpdateRows(
|
||||
data: BatchUpdateByIdData,
|
||||
table: TableDefinition,
|
||||
requestId: string
|
||||
): Promise<BulkOperationResult> {
|
||||
if (data.updates.length === 0) {
|
||||
return { affectedCount: 0, affectedRowIds: [] }
|
||||
}
|
||||
|
||||
const rowIds = data.updates.map((u) => u.rowId)
|
||||
const existingRows = await db
|
||||
.select({ id: userTableRows.id, data: userTableRows.data })
|
||||
.from(userTableRows)
|
||||
.where(
|
||||
and(
|
||||
eq(userTableRows.tableId, data.tableId),
|
||||
eq(userTableRows.workspaceId, data.workspaceId),
|
||||
inArray(userTableRows.id, rowIds)
|
||||
)
|
||||
)
|
||||
|
||||
const existingMap = new Map(existingRows.map((r) => [r.id, r.data as RowData]))
|
||||
|
||||
const missing = rowIds.filter((id) => !existingMap.has(id))
|
||||
if (missing.length > 0) {
|
||||
throw new Error(`Rows not found: ${missing.join(', ')}`)
|
||||
}
|
||||
|
||||
const mergedUpdates: Array<{ rowId: string; mergedData: RowData }> = []
|
||||
for (const update of data.updates) {
|
||||
const existing = existingMap.get(update.rowId)!
|
||||
const merged = { ...existing, ...update.data }
|
||||
|
||||
const sizeValidation = validateRowSize(merged)
|
||||
if (!sizeValidation.valid) {
|
||||
throw new Error(`Row ${update.rowId}: ${sizeValidation.errors.join(', ')}`)
|
||||
}
|
||||
|
||||
const schemaValidation = validateRowAgainstSchema(merged, table.schema)
|
||||
if (!schemaValidation.valid) {
|
||||
throw new Error(`Row ${update.rowId}: ${schemaValidation.errors.join(', ')}`)
|
||||
}
|
||||
|
||||
mergedUpdates.push({ rowId: update.rowId, mergedData: merged })
|
||||
}
|
||||
|
||||
const uniqueColumns = getUniqueColumns(table.schema)
|
||||
if (uniqueColumns.length > 0) {
|
||||
for (const { rowId, mergedData } of mergedUpdates) {
|
||||
const uniqueValidation = await checkUniqueConstraintsDb(
|
||||
data.tableId,
|
||||
mergedData,
|
||||
table.schema,
|
||||
rowId
|
||||
)
|
||||
if (!uniqueValidation.valid) {
|
||||
throw new Error(`Row ${rowId}: ${uniqueValidation.errors.join(', ')}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const now = new Date()
|
||||
|
||||
await db.transaction(async (trx) => {
|
||||
for (let i = 0; i < mergedUpdates.length; i += TABLE_LIMITS.UPDATE_BATCH_SIZE) {
|
||||
const batch = mergedUpdates.slice(i, i + TABLE_LIMITS.UPDATE_BATCH_SIZE)
|
||||
const updatePromises = batch.map(({ rowId, mergedData }) =>
|
||||
trx
|
||||
.update(userTableRows)
|
||||
.set({ data: mergedData, updatedAt: now })
|
||||
.where(eq(userTableRows.id, rowId))
|
||||
)
|
||||
await Promise.all(updatePromises)
|
||||
}
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Batch updated ${mergedUpdates.length} rows in table ${data.tableId}`
|
||||
)
|
||||
|
||||
return {
|
||||
affectedCount: mergedUpdates.length,
|
||||
affectedRowIds: mergedUpdates.map((u) => u.rowId),
|
||||
}
|
||||
}
|
||||
|
||||
type DbTransaction = Parameters<Parameters<typeof db.transaction>[0]>[0]
|
||||
|
||||
/**
|
||||
|
||||
@@ -209,6 +209,12 @@ export interface BulkUpdateData {
|
||||
workspaceId: string
|
||||
}
|
||||
|
||||
export interface BatchUpdateByIdData {
|
||||
tableId: string
|
||||
updates: Array<{ rowId: string; data: RowData }>
|
||||
workspaceId: string
|
||||
}
|
||||
|
||||
export interface BulkDeleteData {
|
||||
tableId: string
|
||||
filter: Filter
|
||||
|
||||
Reference in New Issue
Block a user