feat(sync): updated db and fixed syncing value (#69)

* feat(sync): updated db and fixed syncing value

* solved merge conflicts

* fix(sync): updated routes for settings

* fixed merge conflicts

* feat(sync): syncing workflow deletion

* merged changes
This commit is contained in:
Emir Karabeg
2025-02-18 21:41:40 -08:00
committed by GitHub
parent 666e23fc12
commit 77f671c103
8 changed files with 794 additions and 32 deletions

View File

@@ -10,7 +10,7 @@ const WorkflowSchema = z.object({
id: z.string(),
name: z.string(),
description: z.string().optional(),
state: z.string(), // JSON stringified workflow state
state: z.record(z.any()),
})
// Define the schema for batch sync

View File

@@ -34,13 +34,13 @@ export async function POST(request: Request) {
.values({
id: nanoid(),
userId,
variables: JSON.stringify(Object.fromEntries(securedData)),
variables: Object.fromEntries(securedData),
updatedAt: new Date(),
})
.onConflictDoUpdate({
target: [userEnvironment.userId],
set: {
variables: JSON.stringify(Object.fromEntries(securedData)),
variables: Object.fromEntries(securedData),
updatedAt: new Date(),
},
})
@@ -67,14 +67,14 @@ export async function GET(request: Request) {
.where(eq(userEnvironment.userId, userId))
.limit(1)
if (!result.length) {
if (!result.length || !result[0].variables) {
return NextResponse.json({ data: {} }, { status: 200 })
}
// Parse the variables and return just the structure without the hashed values
const variables = JSON.parse(result[0].variables)
const variables = result[0].variables as Record<string, any>
const sanitizedVariables = Object.fromEntries(
Object.entries(variables).map(([key, value]: [string, any]) => [
Object.entries(variables).map(([key, value]) => [
key,
{ key, value: '••••••••' }, // Hide the actual value
])

View File

@@ -3,7 +3,7 @@ import { eq } from 'drizzle-orm'
import { nanoid } from 'nanoid'
import { z } from 'zod'
import { db } from '@/db'
import { userSettings } from '@/db/schema'
import { settings } from '@/db/schema'
const SettingsSchema = z.object({
userId: z.string(),
@@ -17,17 +17,17 @@ export async function POST(request: Request) {
// Store the settings
await db
.insert(userSettings)
.insert(settings)
.values({
id: nanoid(),
userId,
isAutoConnectEnabled,
general: { isAutoConnectEnabled },
updatedAt: new Date(),
})
.onConflictDoUpdate({
target: [userSettings.userId],
target: [settings.userId],
set: {
isAutoConnectEnabled,
general: { isAutoConnectEnabled },
updatedAt: new Date(),
},
})
@@ -48,11 +48,7 @@ export async function GET(request: Request) {
return NextResponse.json({ error: 'userId is required' }, { status: 400 })
}
const result = await db
.select()
.from(userSettings)
.where(eq(userSettings.userId, userId))
.limit(1)
const result = await db.select().from(settings).where(eq(settings.userId, userId)).limit(1)
if (!result.length) {
return NextResponse.json(
@@ -65,10 +61,11 @@ export async function GET(request: Request) {
)
}
const generalSettings = result[0].general as { isAutoConnectEnabled: boolean }
return NextResponse.json(
{
data: {
isAutoConnectEnabled: result[0].isAutoConnectEnabled,
isAutoConnectEnabled: generalSettings.isAutoConnectEnabled,
},
},
{ status: 200 }

View File

@@ -0,0 +1,35 @@
CREATE TABLE "workflow_logs" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"execution_id" text,
"level" text NOT NULL,
"message" text NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "settings" (
"id" text PRIMARY KEY NOT NULL,
"user_id" text NOT NULL,
"general" json NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "settings_user_id_unique" UNIQUE("user_id")
);
--> statement-breakpoint
CREATE TABLE "environment" (
"id" text PRIMARY KEY NOT NULL,
"user_id" text NOT NULL,
"variables" json NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL,
CONSTRAINT "environment_user_id_unique" UNIQUE("user_id")
);
--> statement-breakpoint
ALTER TABLE "logs" DISABLE ROW LEVEL SECURITY;--> statement-breakpoint
ALTER TABLE "user_environment" DISABLE ROW LEVEL SECURITY;--> statement-breakpoint
ALTER TABLE "user_settings" DISABLE ROW LEVEL SECURITY;--> statement-breakpoint
DROP TABLE "logs" CASCADE;--> statement-breakpoint
DROP TABLE "user_environment" CASCADE;--> statement-breakpoint
DROP TABLE "user_settings" CASCADE;--> statement-breakpoint
ALTER TABLE "workflow" ALTER COLUMN "state" SET DATA TYPE json;--> statement-breakpoint
ALTER TABLE "workflow_logs" ADD CONSTRAINT "workflow_logs_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "settings" ADD CONSTRAINT "settings_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "environment" ADD CONSTRAINT "environment_user_id_user_id_fk" FOREIGN KEY ("user_id") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;

View File

@@ -0,0 +1,678 @@
{
"id": "6ee96eef-e39f-4051-83dc-74ec54eebc20",
"prevId": "d51461fe-443b-47dc-bdb1-8ff9f3c17ab6",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.account": {
"name": "account",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"account_id": {
"name": "account_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"provider_id": {
"name": "provider_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"access_token": {
"name": "access_token",
"type": "text",
"primaryKey": false,
"notNull": false
},
"refresh_token": {
"name": "refresh_token",
"type": "text",
"primaryKey": false,
"notNull": false
},
"id_token": {
"name": "id_token",
"type": "text",
"primaryKey": false,
"notNull": false
},
"access_token_expires_at": {
"name": "access_token_expires_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"refresh_token_expires_at": {
"name": "refresh_token_expires_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"scope": {
"name": "scope",
"type": "text",
"primaryKey": false,
"notNull": false
},
"password": {
"name": "password",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {
"account_user_id_user_id_fk": {
"name": "account_user_id_user_id_fk",
"tableFrom": "account",
"tableTo": "user",
"columnsFrom": ["user_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.workflow_logs": {
"name": "workflow_logs",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"workflow_id": {
"name": "workflow_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"execution_id": {
"name": "execution_id",
"type": "text",
"primaryKey": false,
"notNull": false
},
"level": {
"name": "level",
"type": "text",
"primaryKey": false,
"notNull": true
},
"message": {
"name": "message",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflow_logs_workflow_id_workflow_id_fk": {
"name": "workflow_logs_workflow_id_workflow_id_fk",
"tableFrom": "workflow_logs",
"tableTo": "workflow",
"columnsFrom": ["workflow_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.session": {
"name": "session",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"expires_at": {
"name": "expires_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"token": {
"name": "token",
"type": "text",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"ip_address": {
"name": "ip_address",
"type": "text",
"primaryKey": false,
"notNull": false
},
"user_agent": {
"name": "user_agent",
"type": "text",
"primaryKey": false,
"notNull": false
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {
"session_user_id_user_id_fk": {
"name": "session_user_id_user_id_fk",
"tableFrom": "session",
"tableTo": "user",
"columnsFrom": ["user_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"session_token_unique": {
"name": "session_token_unique",
"nullsNotDistinct": false,
"columns": ["token"]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.settings": {
"name": "settings",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"general": {
"name": "general",
"type": "json",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"settings_user_id_user_id_fk": {
"name": "settings_user_id_user_id_fk",
"tableFrom": "settings",
"tableTo": "user",
"columnsFrom": ["user_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"settings_user_id_unique": {
"name": "settings_user_id_unique",
"nullsNotDistinct": false,
"columns": ["user_id"]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.user": {
"name": "user",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"email": {
"name": "email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"email_verified": {
"name": "email_verified",
"type": "boolean",
"primaryKey": false,
"notNull": true
},
"image": {
"name": "image",
"type": "text",
"primaryKey": false,
"notNull": false
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"user_email_unique": {
"name": "user_email_unique",
"nullsNotDistinct": false,
"columns": ["email"]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.environment": {
"name": "environment",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"variables": {
"name": "variables",
"type": "json",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"environment_user_id_user_id_fk": {
"name": "environment_user_id_user_id_fk",
"tableFrom": "environment",
"tableTo": "user",
"columnsFrom": ["user_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"environment_user_id_unique": {
"name": "environment_user_id_unique",
"nullsNotDistinct": false,
"columns": ["user_id"]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.verification": {
"name": "verification",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"identifier": {
"name": "identifier",
"type": "text",
"primaryKey": false,
"notNull": true
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": true
},
"expires_at": {
"name": "expires_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.waitlist": {
"name": "waitlist",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"email": {
"name": "email",
"type": "text",
"primaryKey": false,
"notNull": true
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'pending'"
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {
"waitlist_email_unique": {
"name": "waitlist_email_unique",
"nullsNotDistinct": false,
"columns": ["email"]
}
},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.workflow": {
"name": "workflow",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"user_id": {
"name": "user_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"name": {
"name": "name",
"type": "text",
"primaryKey": false,
"notNull": true
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false
},
"state": {
"name": "state",
"type": "json",
"primaryKey": false,
"notNull": true
},
"last_synced": {
"name": "last_synced",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true
}
},
"indexes": {},
"foreignKeys": {
"workflow_user_id_user_id_fk": {
"name": "workflow_user_id_user_id_fk",
"tableFrom": "workflow",
"tableTo": "user",
"columnsFrom": ["user_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
},
"public.workflow_schedule": {
"name": "workflow_schedule",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true
},
"workflow_id": {
"name": "workflow_id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"cron_expression": {
"name": "cron_expression",
"type": "text",
"primaryKey": false,
"notNull": false
},
"next_run_at": {
"name": "next_run_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"last_ran_at": {
"name": "last_ran_at",
"type": "timestamp",
"primaryKey": false,
"notNull": false
},
"trigger_type": {
"name": "trigger_type",
"type": "text",
"primaryKey": false,
"notNull": true
},
"timezone": {
"name": "timezone",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'UTC'"
},
"created_at": {
"name": "created_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
},
"updated_at": {
"name": "updated_at",
"type": "timestamp",
"primaryKey": false,
"notNull": true,
"default": "now()"
}
},
"indexes": {},
"foreignKeys": {
"workflow_schedule_workflow_id_workflow_id_fk": {
"name": "workflow_schedule_workflow_id_workflow_id_fk",
"tableFrom": "workflow_schedule",
"tableTo": "workflow",
"columnsFrom": ["workflow_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"policies": {},
"checkConstraints": {},
"isRLSEnabled": false
}
},
"enums": {},
"schemas": {},
"sequences": {},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"columns": {},
"schemas": {},
"tables": {}
}
}

View File

@@ -43,6 +43,13 @@
"when": 1739938301181,
"tag": "0005_shocking_domino",
"breakpoints": true
},
{
"idx": 6,
"version": "7",
"when": 1739939661615,
"tag": "0006_plain_zzzax",
"breakpoints": true
}
]
}

View File

@@ -1,4 +1,4 @@
import { boolean, integer, pgTable, text, timestamp, unique } from 'drizzle-orm/pg-core'
import { boolean, integer, json, pgTable, text, timestamp, unique } from 'drizzle-orm/pg-core'
export const user = pgTable('user', {
id: text('id').primaryKey(),
@@ -57,7 +57,7 @@ export const workflow = pgTable('workflow', {
.references(() => user.id, { onDelete: 'cascade' }),
name: text('name').notNull(),
description: text('description'),
state: text('state').notNull(), // JSON stringified workflow state
state: json('state').notNull(),
lastSynced: timestamp('last_synced').notNull(),
createdAt: timestamp('created_at').notNull(),
updatedAt: timestamp('updated_at').notNull(),
@@ -71,7 +71,7 @@ export const waitlist = pgTable('waitlist', {
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})
export const consoleLog = pgTable('logs', {
export const consoleLog = pgTable('workflow_logs', {
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
@@ -82,24 +82,23 @@ export const consoleLog = pgTable('logs', {
createdAt: timestamp('created_at').notNull().defaultNow(),
})
export const userEnvironment = pgTable('user_environment', {
export const userEnvironment = pgTable('environment', {
id: text('id').primaryKey(), // Use the user id as the key
userId: text('user_id')
.notNull()
.references(() => user.id, { onDelete: 'cascade' })
.unique(), // One environment per user
variables: text('variables').notNull(), // JSON stringified {key: hashedValue}
variables: json('variables').notNull(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})
export const userSettings = pgTable('user_settings', {
export const settings = pgTable('settings', {
id: text('id').primaryKey(), // Use the user id as the key
userId: text('user_id')
.notNull()
.references(() => user.id, { onDelete: 'cascade' })
.unique(), // One settings record per user
isAutoConnectEnabled: boolean('is_auto_connect_enabled').notNull().default(true),
isDebugModeEnabled: boolean('is_debug_mode_enabled').notNull().default(false),
general: json('general').notNull(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})

View File

@@ -1,11 +1,17 @@
import { useWorkflowRegistry } from './workflow/registry/store'
import { BlockState } from './workflow/types'
import { mergeSubblockState } from './workflow/utils'
interface WorkflowSyncPayload {
id: string
name: string
description?: string | undefined
state: string
state: {
blocks: Record<string, BlockState>
edges: any
loops: any
lastSaved: any
}
}
// Track deleted workflow IDs until they're synced
@@ -63,16 +69,20 @@ async function performSync() {
id,
name: metadata.name,
description: metadata.description,
state: JSON.stringify({
state: {
blocks: mergedBlocks,
edges: state.edges,
loops: state.loops,
lastSaved: state.lastSaved,
}),
},
}
})
)
// Filter out null values and sync if there are workflows to sync
const validPayloads = syncPayloads.filter(
(payload): payload is WorkflowSyncPayload => payload !== null
)
// Filter out null values and sync if there are workflows to sync
const validPayloads = syncPayloads.filter(
(payload): payload is WorkflowSyncPayload => payload !== null
@@ -90,10 +100,46 @@ export function initializeSyncManager() {
syncInterval = setInterval(performSync, 30000) // Sync every 30 seconds
const handleBeforeUnload = async (event: BeforeUnloadEvent) => {
// Perform one final sync before unloading
event.preventDefault()
event.returnValue = ''
await performSync()
const { workflows } = useWorkflowRegistry.getState()
// Prepare sync payloads for all workflows
const syncPayloads: (WorkflowSyncPayload | null)[] = await Promise.all(
Object.entries(workflows).map(async ([id, metadata]) => {
// Get workflow state from localStorage
const savedState = localStorage.getItem(`workflow-${id}`)
if (!savedState) return null
const state = JSON.parse(savedState)
// Merge subblock states for all blocks in the workflow
const mergedBlocks = mergeSubblockState(state.blocks)
return {
id,
name: metadata.name,
description: metadata.description,
state: {
blocks: mergedBlocks,
edges: state.edges,
loops: state.loops,
lastSaved: state.lastSaved,
},
}
})
)
// Filter out null values and sync if there are workflows to sync
const validPayloads = syncPayloads.filter(
(payload): payload is WorkflowSyncPayload => payload !== null
)
if (validPayloads.length > 0) {
// Show confirmation dialog
event.preventDefault()
event.returnValue = ''
// Attempt to sync
await syncWorkflowsToServer(validPayloads)
}
}
window.addEventListener('beforeunload', handleBeforeUnload)