This commit is contained in:
Lakee Sivaraya
2026-01-14 16:42:09 -08:00
parent 4422a69a17
commit 15bef489f2
6 changed files with 345 additions and 255 deletions

View File

@@ -389,31 +389,40 @@ export async function DELETE(request: NextRequest, { params }: RowRouteParams) {
}
}
// Delete row
const [deletedRow] = await db
.delete(userTableRows)
.where(
and(
eq(userTableRows.id, rowId),
eq(userTableRows.tableId, tableId),
eq(userTableRows.workspaceId, actualWorkspaceId)
// Delete row in a transaction to ensure atomicity
const deletedRow = await db.transaction(async (trx) => {
// Delete row
const [deleted] = await trx
.delete(userTableRows)
.where(
and(
eq(userTableRows.id, rowId),
eq(userTableRows.tableId, tableId),
eq(userTableRows.workspaceId, actualWorkspaceId)
)
)
)
.returning()
.returning()
if (!deleted) {
return null
}
// Update row count
await trx
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} - 1`,
updatedAt: new Date(),
})
.where(eq(userTableDefinitions.id, tableId))
return deleted
})
if (!deletedRow) {
return NextResponse.json({ error: 'Row not found' }, { status: 404 })
}
// Update row count
await db
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} - 1`,
updatedAt: new Date(),
})
.where(eq(userTableDefinitions.id, tableId))
logger.info(`[${requestId}] Deleted row ${rowId} from table ${tableId}`)
return NextResponse.json({

View File

@@ -274,7 +274,7 @@ async function handleBatchInsert(
}
}
// Insert all rows
// Insert all rows in a transaction to ensure atomicity
const now = new Date()
const rowsToInsert = validated.rows.map((data) => ({
id: `row_${crypto.randomUUID().replace(/-/g, '')}`,
@@ -286,16 +286,21 @@ async function handleBatchInsert(
createdBy: userId,
}))
const insertedRows = await db.insert(userTableRows).values(rowsToInsert).returning()
const insertedRows = await db.transaction(async (trx) => {
// Insert all rows
const inserted = await trx.insert(userTableRows).values(rowsToInsert).returning()
// Update row count
await db
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + ${validated.rows.length}`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
// Update row count
await trx
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + ${validated.rows.length}`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
return inserted
})
logger.info(`[${requestId}] Batch inserted ${insertedRows.length} rows into table ${tableId}`)
@@ -460,31 +465,36 @@ export async function POST(request: NextRequest, { params }: TableRowsRouteParam
)
}
// Insert row
// Insert row in a transaction to ensure atomicity
const rowId = `row_${crypto.randomUUID().replace(/-/g, '')}`
const now = new Date()
const [row] = await db
.insert(userTableRows)
.values({
id: rowId,
tableId,
workspaceId,
data: validated.data,
createdAt: now,
updatedAt: now,
createdBy: authResult.userId,
})
.returning()
const [row] = await db.transaction(async (trx) => {
// Insert row
const insertedRow = await trx
.insert(userTableRows)
.values({
id: rowId,
tableId,
workspaceId,
data: validated.data,
createdAt: now,
updatedAt: now,
createdBy: authResult.userId,
})
.returning()
// Update row count
await db
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + 1`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
// Update row count
await trx
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + 1`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
return insertedRow
})
logger.info(`[${requestId}] Inserted row ${rowId} into table ${tableId}`)
@@ -849,29 +859,33 @@ export async function PUT(request: NextRequest, { params }: TableRowsRouteParams
}
}
// Update rows by merging existing data with new data in batches
// Update rows by merging existing data with new data in a transaction
const now = new Date()
const BATCH_SIZE = 100 // Smaller batch for updates since each is a separate query
let totalUpdated = 0
for (let i = 0; i < matchingRows.length; i += BATCH_SIZE) {
const batch = matchingRows.slice(i, i + BATCH_SIZE)
const updatePromises = batch.map((row) => {
const existingData = row.data as RowData
return db
.update(userTableRows)
.set({
data: { ...existingData, ...updateData },
updatedAt: now,
})
.where(eq(userTableRows.id, row.id))
})
await Promise.all(updatePromises)
totalUpdated += batch.length
logger.info(
`[${requestId}] Updated batch ${Math.floor(i / BATCH_SIZE) + 1} (${totalUpdated}/${matchingRows.length} rows)`
)
}
await db.transaction(async (trx) => {
let totalUpdated = 0
// Process updates in batches
for (let i = 0; i < matchingRows.length; i += BATCH_SIZE) {
const batch = matchingRows.slice(i, i + BATCH_SIZE)
const updatePromises = batch.map((row) => {
const existingData = row.data as RowData
return trx
.update(userTableRows)
.set({
data: { ...existingData, ...updateData },
updatedAt: now,
})
.where(eq(userTableRows.id, row.id))
})
await Promise.all(updatePromises)
totalUpdated += batch.length
logger.info(
`[${requestId}] Updated batch ${Math.floor(i / BATCH_SIZE) + 1} (${totalUpdated}/${matchingRows.length} rows)`
)
}
})
logger.info(`[${requestId}] Updated ${matchingRows.length} rows in table ${tableId}`)
@@ -999,37 +1013,41 @@ export async function DELETE(request: NextRequest, { params }: TableRowsRoutePar
logger.warn(`[${requestId}] Deleting ${matchingRows.length} rows. This may take some time.`)
}
// Delete the matching rows in batches to avoid stack overflow
// Delete the matching rows in a transaction to ensure atomicity
const rowIds = matchingRows.map((r) => r.id)
const BATCH_SIZE = 1000
let totalDeleted = 0
for (let i = 0; i < rowIds.length; i += BATCH_SIZE) {
const batch = rowIds.slice(i, i + BATCH_SIZE)
await db.delete(userTableRows).where(
and(
eq(userTableRows.tableId, tableId),
eq(userTableRows.workspaceId, actualWorkspaceId),
sql`${userTableRows.id} = ANY(ARRAY[${sql.join(
batch.map((id) => sql`${id}`),
sql`, `
)}])`
await db.transaction(async (trx) => {
let totalDeleted = 0
// Delete rows in batches to avoid stack overflow
for (let i = 0; i < rowIds.length; i += BATCH_SIZE) {
const batch = rowIds.slice(i, i + BATCH_SIZE)
await trx.delete(userTableRows).where(
and(
eq(userTableRows.tableId, tableId),
eq(userTableRows.workspaceId, actualWorkspaceId),
sql`${userTableRows.id} = ANY(ARRAY[${sql.join(
batch.map((id) => sql`${id}`),
sql`, `
)}])`
)
)
)
totalDeleted += batch.length
logger.info(
`[${requestId}] Deleted batch ${Math.floor(i / BATCH_SIZE) + 1} (${totalDeleted}/${rowIds.length} rows)`
)
}
totalDeleted += batch.length
logger.info(
`[${requestId}] Deleted batch ${Math.floor(i / BATCH_SIZE) + 1} (${totalDeleted}/${rowIds.length} rows)`
)
}
// Update row count
await db
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} - ${matchingRows.length}`,
updatedAt: new Date(),
})
.where(eq(userTableDefinitions.id, tableId))
// Update row count
await trx
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} - ${matchingRows.length}`,
updatedAt: new Date(),
})
.where(eq(userTableDefinitions.id, tableId))
})
logger.info(`[${requestId}] Deleted ${matchingRows.length} rows from table ${tableId}`)

View File

@@ -208,68 +208,69 @@ export async function POST(request: NextRequest, { params }: UpsertRouteParams)
const now = new Date()
if (existingRow) {
// Update existing row
const [updatedRow] = await db
.update(userTableRows)
.set({
// Perform upsert in a transaction to ensure atomicity
const result = await db.transaction(async (trx) => {
if (existingRow) {
// Update existing row
const [updatedRow] = await trx
.update(userTableRows)
.set({
data: validated.data,
updatedAt: now,
})
.where(eq(userTableRows.id, existingRow.id))
.returning()
return {
row: updatedRow,
operation: 'update' as const,
}
}
// Insert new row
const [insertedRow] = await trx
.insert(userTableRows)
.values({
id: `row_${crypto.randomUUID().replace(/-/g, '')}`,
tableId,
workspaceId: actualWorkspaceId,
data: validated.data,
createdAt: now,
updatedAt: now,
createdBy: authResult.userId,
})
.where(eq(userTableRows.id, existingRow.id))
.returning()
logger.info(`[${requestId}] Upserted (updated) row ${updatedRow.id} in table ${tableId}`)
// Update row count for insert
await trx
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + 1`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
return NextResponse.json({
success: true,
data: {
row: {
id: updatedRow.id,
data: updatedRow.data,
createdAt: updatedRow.createdAt.toISOString(),
updatedAt: updatedRow.updatedAt.toISOString(),
},
operation: 'update',
message: 'Row updated successfully',
},
})
}
// Insert new row
const [insertedRow] = await db
.insert(userTableRows)
.values({
tableId,
workspaceId: actualWorkspaceId,
data: validated.data,
createdAt: now,
updatedAt: now,
createdBy: authResult.userId,
})
.returning()
return {
row: insertedRow,
operation: 'insert' as const,
}
})
// Update row count
await db
.update(userTableDefinitions)
.set({
rowCount: sql`${userTableDefinitions.rowCount} + 1`,
updatedAt: now,
})
.where(eq(userTableDefinitions.id, tableId))
logger.info(`[${requestId}] Upserted (inserted) row ${insertedRow.id} in table ${tableId}`)
logger.info(
`[${requestId}] Upserted (${result.operation}) row ${result.row.id} in table ${tableId}`
)
return NextResponse.json({
success: true,
data: {
row: {
id: insertedRow.id,
data: insertedRow.data,
createdAt: insertedRow.createdAt.toISOString(),
updatedAt: insertedRow.updatedAt.toISOString(),
id: result.row.id,
data: result.row.data,
createdAt: result.row.createdAt.toISOString(),
updatedAt: result.row.updatedAt.toISOString(),
},
operation: 'insert',
message: 'Row inserted successfully',
operation: result.operation,
message: `Row ${result.operation === 'update' ? 'updated' : 'inserted'} successfully`,
},
})
} catch (error) {

View File

@@ -80,6 +80,86 @@ export interface TableAccessDenied {
*/
export type TableAccessCheck = TableAccessResult | TableAccessDenied
/**
* Permission level required for table access.
* - 'read': Any workspace permission (read, write, or admin)
* - 'write': Write or admin permission required
* - 'admin': Admin permission required
*/
export type TablePermissionLevel = 'read' | 'write' | 'admin'
/**
* Internal function to check if a user has the required permission level for a table.
*
* Access is granted if:
* 1. User created the table directly, OR
* 2. User has the required permission level on the table's workspace
*
* @param tableId - The unique identifier of the table to check
* @param userId - The unique identifier of the user requesting access
* @param requiredLevel - The minimum permission level required ('read', 'write', or 'admin')
* @returns A promise resolving to the access check result
*
* @internal
*/
async function checkTableAccessInternal(
tableId: string,
userId: string,
requiredLevel: TablePermissionLevel
): Promise<TableAccessCheck> {
// Fetch table data
const table = await db
.select({
id: userTableDefinitions.id,
createdBy: userTableDefinitions.createdBy,
workspaceId: userTableDefinitions.workspaceId,
})
.from(userTableDefinitions)
.where(and(eq(userTableDefinitions.id, tableId), isNull(userTableDefinitions.deletedAt)))
.limit(1)
if (table.length === 0) {
return { hasAccess: false, notFound: true }
}
const tableData = table[0]
// Case 1: User created the table directly (always has full access)
if (tableData.createdBy === userId) {
return { hasAccess: true, table: tableData }
}
// Case 2: Check workspace permissions
const userPermission = await getUserEntityPermissions(userId, 'workspace', tableData.workspaceId)
if (userPermission === null) {
return { hasAccess: false }
}
// Determine if user has sufficient permission level
const hasAccess = (() => {
switch (requiredLevel) {
case 'read':
// Any permission level grants read access
return true
case 'write':
// Write or admin permission required
return userPermission === 'write' || userPermission === 'admin'
case 'admin':
// Only admin permission grants admin access
return userPermission === 'admin'
default:
return false
}
})()
if (hasAccess) {
return { hasAccess: true, table: tableData }
}
return { hasAccess: false }
}
/**
* Checks if a user has read access to a table.
*
@@ -104,34 +184,7 @@ export type TableAccessCheck = TableAccessResult | TableAccessDenied
* ```
*/
export async function checkTableAccess(tableId: string, userId: string): Promise<TableAccessCheck> {
const table = await db
.select({
id: userTableDefinitions.id,
createdBy: userTableDefinitions.createdBy,
workspaceId: userTableDefinitions.workspaceId,
})
.from(userTableDefinitions)
.where(and(eq(userTableDefinitions.id, tableId), isNull(userTableDefinitions.deletedAt)))
.limit(1)
if (table.length === 0) {
return { hasAccess: false, notFound: true }
}
const tableData = table[0]
// Case 1: User created the table directly
if (tableData.createdBy === userId) {
return { hasAccess: true, table: tableData }
}
// Case 2: Table belongs to a workspace the user has permissions for
const userPermission = await getUserEntityPermissions(userId, 'workspace', tableData.workspaceId)
if (userPermission !== null) {
return { hasAccess: true, table: tableData }
}
return { hasAccess: false }
return checkTableAccessInternal(tableId, userId, 'read')
}
/**
@@ -158,34 +211,7 @@ export async function checkTableWriteAccess(
tableId: string,
userId: string
): Promise<TableAccessCheck> {
const table = await db
.select({
id: userTableDefinitions.id,
createdBy: userTableDefinitions.createdBy,
workspaceId: userTableDefinitions.workspaceId,
})
.from(userTableDefinitions)
.where(and(eq(userTableDefinitions.id, tableId), isNull(userTableDefinitions.deletedAt)))
.limit(1)
if (table.length === 0) {
return { hasAccess: false, notFound: true }
}
const tableData = table[0]
// Case 1: User created the table directly
if (tableData.createdBy === userId) {
return { hasAccess: true, table: tableData }
}
// Case 2: Table belongs to a workspace and user has write/admin permissions
const userPermission = await getUserEntityPermissions(userId, 'workspace', tableData.workspaceId)
if (userPermission === 'write' || userPermission === 'admin') {
return { hasAccess: true, table: tableData }
}
return { hasAccess: false }
return checkTableAccessInternal(tableId, userId, 'write')
}
/**

View File

@@ -1,12 +1,13 @@
'use client'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { useCallback, useMemo } from 'react'
import { ExternalLink } from 'lucide-react'
import { useParams } from 'next/navigation'
import { Combobox, type ComboboxOption, Tooltip } from '@/components/emcn'
import { Button } from '@/components/ui/button'
import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/hooks/use-sub-block-value'
import type { SubBlockConfig } from '@/blocks/types'
import { useTablesList } from '@/hooks/queries/use-tables'
interface TableSelectorProps {
blockId: string
@@ -16,17 +17,13 @@ interface TableSelectorProps {
previewValue?: string | null
}
interface TableOption {
label: string
id: string
}
/**
* Table selector component with dropdown and link to view table
*
* @remarks
* Provides a dropdown to select workspace tables and an external link
* to navigate directly to the table page view when a table is selected.
* Uses React Query for efficient data fetching and caching.
*/
export function TableSelector({
blockId,
@@ -39,52 +36,20 @@ export function TableSelector({
const workspaceId = params.workspaceId as string
const [storeValue, setStoreValue] = useSubBlockValue<string>(blockId, subBlock.id)
const [tables, setTables] = useState<TableOption[]>([])
const [isLoading, setIsLoading] = useState(false)
const [error, setError] = useState<string | null>(null)
// Use React Query hook for table data - it handles caching, loading, and error states
const {
data: tables = [],
isLoading,
error,
} = useTablesList(isPreview || disabled ? undefined : workspaceId)
const value = isPreview ? previewValue : storeValue
const tableId = typeof value === 'string' ? value : null
/**
* Fetches available tables from the API
*/
const fetchTables = useCallback(async () => {
if (!workspaceId || isPreview || disabled) return
setIsLoading(true)
setError(null)
try {
const response = await fetch(`/api/table?workspaceId=${workspaceId}`)
if (!response.ok) {
throw new Error('Failed to fetch tables')
}
const data = await response.json()
const tableOptions = (data.data?.tables || []).map((table: { id: string; name: string }) => ({
label: table.name,
id: table.id,
}))
setTables(tableOptions)
} catch (err) {
const errorMessage = err instanceof Error ? err.message : 'Failed to fetch tables'
setError(errorMessage)
setTables([])
} finally {
setIsLoading(false)
}
}, [workspaceId, isPreview, disabled])
useEffect(() => {
if (!isPreview && !disabled && tables.length === 0 && !isLoading && !error) {
void fetchTables()
}
}, [fetchTables, isPreview, disabled, tables.length, isLoading, error])
const options = useMemo<ComboboxOption[]>(() => {
return tables.map((table) => ({
label: table.label.toLowerCase(),
label: table.name.toLowerCase(),
value: table.id,
}))
}, [tables])
@@ -105,6 +70,9 @@ export function TableSelector({
const hasSelectedTable = tableId && !tableId.startsWith('<')
// Convert error object to string if needed
const errorMessage = error instanceof Error ? error.message : error ? String(error) : undefined
return (
<div className='flex items-center gap-[6px]'>
<div className='flex-1'>
@@ -115,13 +83,8 @@ export function TableSelector({
placeholder={subBlock.placeholder || 'Select a table'}
disabled={disabled || isPreview}
editable={false}
onOpenChange={(open) => {
if (open) {
void fetchTables()
}
}}
isLoading={isLoading}
error={error}
error={errorMessage}
searchable={options.length > 5}
searchPlaceholder='Search...'
/>

View File

@@ -7,6 +7,7 @@
import type { SQL } from 'drizzle-orm'
import { sql } from 'drizzle-orm'
import { NAME_PATTERN } from './constants'
type JsonValue = string | number | boolean | null | JsonValue[] | { [key: string]: JsonValue }
@@ -28,6 +29,55 @@ export interface QueryFilter {
[key: string]: JsonValue | FieldCondition | QueryFilter[] | undefined
}
/**
* Whitelist of allowed operators for query filtering.
* Only these operators can be used in filter conditions.
*/
const ALLOWED_OPERATORS = new Set([
'$eq',
'$ne',
'$gt',
'$gte',
'$lt',
'$lte',
'$in',
'$nin',
'$contains',
])
/**
* Validates a field name to prevent SQL injection.
* Field names must match the NAME_PATTERN (alphanumeric + underscore, starting with letter/underscore).
*
* @param field - The field name to validate
* @throws Error if field name is invalid
*/
function validateFieldName(field: string): void {
if (!field || typeof field !== 'string') {
throw new Error('Field name must be a non-empty string')
}
if (!NAME_PATTERN.test(field)) {
throw new Error(
`Invalid field name "${field}". Field names must start with a letter or underscore, followed by alphanumeric characters or underscores.`
)
}
}
/**
* Validates an operator to ensure it's in the allowed list.
*
* @param operator - The operator to validate
* @throws Error if operator is not allowed
*/
function validateOperator(operator: string): void {
if (!ALLOWED_OPERATORS.has(operator)) {
throw new Error(
`Invalid operator "${operator}". Allowed operators: ${Array.from(ALLOWED_OPERATORS).join(', ')}`
)
}
}
/**
* Builds a JSONB containment clause using GIN index.
* Generates: `table.data @> '{"field": value}'::jsonb`
@@ -42,11 +92,17 @@ function buildFieldCondition(
field: string,
condition: JsonValue | FieldCondition
): SQL[] {
// Validate field name to prevent SQL injection
validateFieldName(field)
const conditions: SQL[] = []
const escapedField = field.replace(/'/g, "''")
if (typeof condition === 'object' && condition !== null && !Array.isArray(condition)) {
for (const [op, value] of Object.entries(condition)) {
// Validate operator to ensure only allowed operators are used
validateOperator(op)
switch (op) {
case '$eq':
conditions.push(buildContainmentClause(tableName, field, value as JsonValue))
@@ -107,6 +163,10 @@ function buildFieldCondition(
sql`${sql.raw(`${tableName}.data->>'${escapedField}'`)} ILIKE ${`%${value}%`}`
)
break
default:
// This should never happen due to validateOperator, but added for completeness
throw new Error(`Unsupported operator: ${op}`)
}
}
} else {
@@ -221,6 +281,11 @@ export function buildFilterClause(filter: QueryFilter, tableName: string): SQL |
/**
* Builds an ORDER BY clause from a sort object.
* Note: JSONB fields use text extraction, so numeric sorting may not work as expected.
*
* @param sort - Sort object with field names and directions
* @param tableName - Table name for the query
* @returns SQL ORDER BY clause or undefined if no sort specified
* @throws Error if field name is invalid
*/
export function buildSortClause(
sort: Record<string, 'asc' | 'desc'>,
@@ -233,10 +298,18 @@ export function buildSortClause(
* - For `createdAt` and `updatedAt`, use the top-level table columns for proper type sorting.
* - For all other fields, treat them as keys in the table's data JSONB column.
* Extraction is performed with ->> to return text, which is then sorted.
* - Field names are escaped for safety.
* - Field names are validated to prevent SQL injection.
*/
for (const [field, direction] of Object.entries(sort)) {
// Escape single quotes for SQL safety
// Validate field name to prevent SQL injection
validateFieldName(field)
// Validate direction
if (direction !== 'asc' && direction !== 'desc') {
throw new Error(`Invalid sort direction "${direction}". Must be "asc" or "desc".`)
}
// Escape single quotes for SQL safety (defense in depth)
const escapedField = field.replace(/'/g, "''")
if (field === 'createdAt' || field === 'updatedAt') {