Compare commits

..

43 Commits

Author SHA1 Message Date
Waleed
028bc652c2 v0.5.76: posthog improvements, readme updates 2026-01-29 00:13:19 -08:00
Waleed
c6bf5cd58c v0.5.75: search modal overhaul, helm chart updates, run from block, terminal and visual debugging improvements 2026-01-28 22:54:13 -08:00
Vikhyath Mondreti
11dc18a80d v0.5.74: autolayout improvements, clerk integration, auth enforcements 2026-01-27 20:37:39 -08:00
Waleed
ab4e9dc72f v0.5.73: ci, helm updates, kb, ui fixes, note block enhancements 2026-01-26 22:04:35 -08:00
Vikhyath Mondreti
1c58c35bd8 v0.5.72: azure connection string, supabase improvement, multitrigger resolution, docs quick reference 2026-01-25 23:42:27 -08:00
Waleed
d63a5cb504 v0.5.71: ux, ci improvements, docs updates 2026-01-25 03:08:08 -08:00
Waleed
8bd5d41723 v0.5.70: router fix, anthropic agent response format adherence 2026-01-24 20:57:02 -08:00
Waleed
c12931bc50 v0.5.69: kb upgrades, blog, copilot improvements, auth consolidation (#2973)
* fix(subflows): tag dropdown + resolution logic (#2949)

* fix(subflows): tag dropdown + resolution logic

* fixes;

* revert parallel change

* chore(deps): bump posthog-js to 1.334.1 (#2948)

* fix(idempotency): add conflict target to atomicallyClaimDb query + remove redundant db namespace tracking (#2950)

* fix(idempotency): add conflict target to atomicallyClaimDb query

* delete needs to account for namespace

* simplify namespace filtering logic

* fix cleanup

* consistent target

* improvement(kb): add document filtering, select all, and React Query migration (#2951)

* improvement(kb): add document filtering, select all, and React Query migration

* test(kb): update tests for enabledFilter and removed userId params

* fix(kb): remove non-null assertion, add explicit guard

* improvement(logs): trace span, details (#2952)

* improvement(action-bar): ordering

* improvement(logs): details, trace span

* feat(blog): v0.5 release post (#2953)

* feat(blog): v0.5 post

* improvement(blog): simplify title and remove code block header

- Simplified blog title from Introducing Sim Studio v0.5 to Introducing Sim v0.5
- Removed language label header and copy button from code blocks for cleaner appearance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ack PR comments

* small styling improvements

* created system to create post-specific components

* updated componnet

* cache invalidation

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

* feat(admin): add credits endpoint to issue credits to users (#2954)

* feat(admin): add credits endpoint to issue credits to users

* fix(admin): use existing credit functions and handle enterprise seats

* fix(admin): reject NaN and Infinity in amount validation

* styling

* fix(admin): validate userId and email are strings

* improvement(copilot): fast mode, subagent tool responses and allow preferences (#2955)

* Improvements

* Fix actions mapping

* Remove console logs

* fix(billing): handle missing userStats and prevent crashes (#2956)

* fix(billing): handle missing userStats and prevent crashes

* fix(billing): correct import path for getFilledPillColor

* fix(billing): add Number.isFinite check to lastPeriodCost

* fix(logs): refresh logic to refresh logs details (#2958)

* fix(security): add authentication and input validation to API routes (#2959)

* fix(security): add authentication and input validation to API routes

* moved utils

* remove extraneous commetns

* removed unused dep

* improvement(helm): add internal ingress support and same-host path consolidation (#2960)

* improvement(helm): add internal ingress support and same-host path consolidation

* improvement(helm): clean up ingress template comments

Simplify verbose inline Helm comments and section dividers to match the
minimal style used in services.yaml.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(helm): add missing copilot path consolidation for realtime host

When copilot.host equals realtime.host but differs from app.host,
copilot paths were not being routed. Added logic to consolidate
copilot paths into the realtime rule for this scenario.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* improvement(helm): follow ingress best practices

- Remove orphan comments that appeared when services were disabled
- Add documentation about path ordering requirements
- Paths rendered in order: realtime, copilot, app (specific before catch-all)
- Clean template output matching industry Helm chart standards

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

* feat(blog): enterprise post (#2961)

* feat(blog): enterprise post

* added more images, styling

* more content

* updated v0-5 post

* remove unused transition

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>

* fix(envvars): resolution standardized (#2957)

* fix(envvars): resolution standardized

* remove comments

* address bugbot

* fix highlighting for env vars

* remove comments

* address greptile

* address bugbot

* fix(copilot): mask credentials fix (#2963)

* Fix copilot masking

* Clean up

* Lint

* improvement(webhooks): remove dead code (#2965)

* fix(webhooks): subscription recreation path

* improvement(webhooks): remove dead code

* fix tests

* address bugbot comments

* fix restoration edge case

* fix more edge cases

* address bugbot comments

* fix gmail polling

* add warnings for UI indication for credential sets

* fix(preview): subblock values (#2969)

* fix(child-workflow): nested spans handoff (#2966)

* fix(child-workflow): nested spans handoff

* remove overly defensive programming

* update type check

* type more code

* remove more dead code

* address bugbot comments

* fix(security): restrict API key access on internal-only routes (#2964)

* fix(security): restrict API key access on internal-only routes

* test(security): update function execute tests for checkInternalAuth

* updated agent handler

* move session check higher in checkSessionOrInternalAuth

* extracted duplicate code into helper for resolving user from jwt

* fix(copilot): update copilot chat title (#2968)

* fix(hitl): fix condition blocks after hitl (#2967)

* fix(notes): ghost edges (#2970)

* fix(notes): ghost edges

* fix deployed state fallback

* fallback

* remove UI level checks

* annotation missing from autoconnect source check

* improvement(docs): loop and parallel var reference syntax (#2975)

* fix(blog): slash actions description (#2976)

* improvement(docs): loop and parallel var reference syntax

* fix(blog): slash actions description

* fix(auth): copilot routes (#2977)

* Fix copilot auth

* Fix

* Fix

* Fix

* fix(copilot): fix edit summary for loops/parallels (#2978)

* fix(integrations): hide from tool bar (#2544)

* fix(landing): ui (#2979)

* fix(edge-validation): race condition on collaborative add (#2980)

* fix(variables): boolean type support and input improvements (#2981)

* fix(variables): boolean type support and input improvements

* fix formatting

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-24 14:29:53 -08:00
Waleed
e9c4251c1c v0.5.68: router block reasoning, executor improvements, variable resolution consolidation, helm updates (#2946)
* improvement(workflow-item): stabilize avatar layout and fix name truncation (#2939)

* improvement(workflow-item): stabilize avatar layout and fix name truncation

* fix(avatars): revert overflow bg to hardcoded color for contrast

* fix(executor): stop parallel execution when block errors (#2940)

* improvement(helm): add per-deployment extraVolumes support (#2942)

* fix(gmail): expose messageId field in read email block (#2943)

* fix(resolver): consolidate reference resolution  (#2941)

* fix(resolver): consolidate code to resolve references

* fix edge cases

* use already formatted error

* fix multi index

* fix backwards compat reachability

* handle backwards compatibility accurately

* use shared constant correctly

* feat(router): expose reasoning output in router v2 block (#2945)

* fix(copilot): always allow, credential masking (#2947)

* Fix always allow, credential validation

* Credential masking

* Autoload

* fix(executor): handle condition dead-end branches in loops (#2944)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
2026-01-22 13:48:15 -08:00
Waleed
cc2be33d6b v0.5.67: loading, password reset, ui improvements, helm updates (#2928)
* fix(zustand): updated to useShallow from deprecated createWithEqualityFn (#2919)

* fix(logger): use direct env access for webpack inlining (#2920)

* fix(notifications): text overflow with line-clamp (#2921)

* chore(helm): add env vars for Vertex AI, orgs, and telemetry (#2922)

* fix(auth): improve reset password flow and consolidate brand detection (#2924)

* fix(auth): improve reset password flow and consolidate brand detection

* fix(auth): set errorHandled for EMAIL_NOT_VERIFIED to prevent duplicate error

* fix(auth): clear success message on login errors

* chore(auth): fix import order per lint

* fix(action-bar): duplicate subflows with children (#2923)

* fix(action-bar): duplicate subflows with children

* fix(action-bar): add validateTriggerPaste for subflow duplicate

* fix(resolver): agent response format, input formats, root level (#2925)

* fix(resolvers): agent response format, input formats, root level

* fix response block initial seeding

* fix tests

* fix(messages-input): fix cursor alignment and auto-resize with overlay (#2926)

* fix(messages-input): fix cursor alignment and auto-resize with overlay

* fixed remaining zustand warnings

* fix(stores): remove dead code causing log spam on startup (#2927)

* fix(stores): remove dead code causing log spam on startup

* fix(stores): replace custom tools zustand store with react query cache

* improvement(ui): use BrandedButton and BrandedLink components (#2930)

- Refactor auth forms to use BrandedButton component
- Add BrandedLink component for changelog page
- Reduce code duplication in login, signup, reset-password forms
- Update star count default value

* fix(custom-tools): remove unsafe title fallback in getCustomTool (#2929)

* fix(custom-tools): remove unsafe title fallback in getCustomTool

* fix(custom-tools): restore title fallback in getCustomTool lookup

Custom tools are referenced by title (custom_${title}), not database ID.
The title fallback is required for client-side tool resolution to work.

* fix(null-bodies): empty bodies handling (#2931)

* fix(null-statuses): empty bodies handling

* address bugbot comment

* fix(token-refresh): microsoft, notion, x, linear (#2933)

* fix(microsoft): proactive refresh needed

* fix(x): missing token refresh flag

* notion and linear missing flag too

* address bugbot comment

* fix(auth): handle EMAIL_NOT_VERIFIED in onError callback (#2932)

* fix(auth): handle EMAIL_NOT_VERIFIED in onError callback

* refactor(auth): extract redirectToVerify helper to reduce duplication

* fix(workflow-selector): use dedicated selector for workflow dropdown (#2934)

* feat(workflow-block): preview (#2935)

* improvement(copilot): tool configs to show nested props (#2936)

* fix(auth): add genericOAuth providers to trustedProviders (#2937)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
2026-01-21 22:53:25 -08:00
Vikhyath Mondreti
45371e521e v0.5.66: external http requests fix, ring highlighting 2026-01-21 02:55:39 -08:00
Waleed
0ce0f98aa5 v0.5.65: gemini updates, textract integration, ui updates (#2909)
* fix(google): wrap primitive tool responses for Gemini API compatibility (#2900)

* fix(canonical): copilot path + update parent (#2901)

* fix(rss): add top-level title, link, pubDate fields to RSS trigger output (#2902)

* fix(rss): add top-level title, link, pubDate fields to RSS trigger output

* fix(imap): add top-level fields to IMAP trigger output

* improvement(browseruse): add profile id param (#2903)

* improvement(browseruse): add profile id param

* make request a stub since we have directExec

* improvement(executor): upgraded abort controller to handle aborts for loops and parallels (#2880)

* improvement(executor): upgraded abort controller to handle aborts for loops and parallels

* comments

* improvement(files): update execution for passing base64 strings (#2906)

* progress

* improvement(execution): update execution for passing base64 strings

* fix types

* cleanup comments

* path security vuln

* reject promise correctly

* fix redirect case

* remove proxy routes

* fix tests

* use ipaddr

* feat(tools): added textract, added v2 for mistral, updated tag dropdown (#2904)

* feat(tools): added textract

* cleanup

* ack pr comments

* reorder

* removed upload for textract async version

* fix additional fields dropdown in editor, update parser to leave validation to be done on the server

* added mistral v2, files v2, and finalized textract

* updated the rest of the old file patterns, updated mistral outputs for v2

* updated tag dropdown to parse non-operation fields as well

* updated extension finder

* cleanup

* added description for inputs to workflow

* use helper for internal route check

* fix tag dropdown merge conflict change

* remove duplicate code

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>

* fix(ui): change add inputs button to match output selector (#2907)

* fix(canvas): removed invite to workspace from canvas popover (#2908)

* fix(canvas): removed invite to workspace

* removed unused props

* fix(copilot): legacy tool display names (#2911)

* fix(a2a): canonical merge  (#2912)

* fix canonical merge

* fix empty array case

* fix(change-detection): copilot diffs have extra field (#2913)

* improvement(logs): improved logs ui bugs, added subflow disable UI (#2910)

* improvement(logs): improved logs ui bugs, added subflow disable UI

* added duplicate to action bar for subflows

* feat(broadcast): email v0.5 (#2905)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
2026-01-20 23:54:55 -08:00
Waleed
dff1c9d083 v0.5.64: unsubscribe, search improvements, metrics, additional SSO configuration 2026-01-20 00:34:11 -08:00
Vikhyath Mondreti
b09f683072 v0.5.63: ui and performance improvements, more google tools 2026-01-18 15:22:42 -08:00
Vikhyath Mondreti
a8bb0db660 v0.5.62: webhook bug fixes, seeding default subblock values, block selection fixes 2026-01-16 20:27:06 -08:00
Waleed
af82820a28 v0.5.61: webhook improvements, workflow controls, react query for deployment status, chat fixes, reducto and pulse OCR, linear fixes 2026-01-16 18:06:23 -08:00
Waleed
4372841797 v0.5.60: invitation flow improvements, chat fixes, a2a improvements, additional copilot actions 2026-01-15 00:02:18 -08:00
Waleed
5e8c843241 v0.5.59: a2a support, documentation 2026-01-13 13:21:21 -08:00
Waleed
7bf3d73ee6 v0.5.58: export folders, new tools, permissions groups enhancements 2026-01-13 00:56:59 -08:00
Vikhyath Mondreti
7ffc11a738 v0.5.57: subagents, context menu improvements, bug fixes 2026-01-11 11:38:40 -08:00
Waleed
be578e2ed7 v0.5.56: batch operations, access control and permission groups, billing fixes 2026-01-10 00:31:34 -08:00
Waleed
f415e5edc4 v0.5.55: polling groups, bedrock provider, devcontainer fixes, workflow preview enhancements 2026-01-08 23:36:56 -08:00
Waleed
13a6e6c3fa v0.5.54: seo, model blacklist, helm chart updates, fireflies integration, autoconnect improvements, billing fixes 2026-01-07 16:09:45 -08:00
Waleed
f5ab7f21ae v0.5.53: hotkey improvements, added redis fallback, fixes for workflow tool 2026-01-06 23:34:52 -08:00
Waleed
bfb6fffe38 v0.5.52: new port-based router block, combobox expression and variable support 2026-01-06 16:14:10 -08:00
Waleed
4fbec0a43f v0.5.51: triggers, kb, condition block improvements, supabase and grain integration updates 2026-01-06 14:26:46 -08:00
Waleed
585f5e365b v0.5.50: import improvements, ui upgrades, kb styling and performance improvements 2026-01-05 00:35:55 -08:00
Waleed
3792bdd252 v0.5.49: hitl improvements, new email styles, imap trigger, logs context menu (#2672)
* feat(logs-context-menu): consolidated logs utils and types, added logs record context menu (#2659)

* feat(email): welcome email; improvement(emails): ui/ux (#2658)

* feat(email): welcome email; improvement(emails): ui/ux

* improvement(emails): links, accounts, preview

* refactor(emails): file structure and wrapper components

* added envvar for personal emails sent, added isHosted gate

* fixed failing tests, added env mock

* fix: removed comment

---------

Co-authored-by: waleed <walif6@gmail.com>

* fix(logging): hitl + trigger dev crash protection (#2664)

* hitl gaps

* deal with trigger worker crashes

* cleanup import strcuture

* feat(imap): added support for imap trigger (#2663)

* feat(tools): added support for imap trigger

* feat(imap): added parity, tested

* ack PR comments

* final cleanup

* feat(i18n): update translations (#2665)

Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>

* fix(grain): updated grain trigger to auto-establish trigger (#2666)

Co-authored-by: aadamgough <adam@sim.ai>

* feat(admin): routes to manage deployments (#2667)

* feat(admin): routes to manage deployments

* fix naming fo deployed by

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date (#2668)

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date

* removed unused params, cleaned up redundant utils

* improvement(invite): aligned styling (#2669)

* improvement(invite): aligned with rest of app

* fix(invite): error handling

* fix: addressed comments

---------

Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>
Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: aadamgough <adam@sim.ai>
2026-01-03 13:19:18 -08:00
Waleed
eb5d1f3e5b v0.5.48: copy-paste workflow blocks, docs updates, mcp tool fixes 2025-12-31 18:00:04 -08:00
Waleed
54ab82c8dd v0.5.47: deploy workflow as mcp, kb chunks tokenizer, UI improvements, jira service management tools 2025-12-30 23:18:58 -08:00
Waleed
f895bf469b v0.5.46: build improvements, greptile, light mode improvements 2025-12-29 02:17:52 -08:00
Waleed
dd3209af06 v0.5.45: light mode fixes, realtime usage indicator, docker build improvements 2025-12-27 19:57:42 -08:00
Waleed
b6ba3b50a7 v0.5.44: keyboard shortcuts, autolayout, light mode, byok, testing improvements 2025-12-26 21:25:19 -08:00
Waleed
b304233062 v0.5.43: export logs, circleback, grain, vertex, code hygiene, schedule improvements 2025-12-23 19:19:18 -08:00
Vikhyath Mondreti
57e4b49bd6 v0.5.42: fix memory migration 2025-12-23 01:24:54 -08:00
Vikhyath Mondreti
e12dd204ed v0.5.41: memory fixes, copilot improvements, knowledgebase improvements, LLM providers standardization 2025-12-23 00:15:18 -08:00
Vikhyath Mondreti
3d9d9cbc54 v0.5.40: supabase ops to allow non-public schemas, jira uuid 2025-12-21 22:28:05 -08:00
Waleed
0f4ec962ad v0.5.39: notion, workflow variables fixes 2025-12-20 20:44:00 -08:00
Waleed
4827866f9a v0.5.38: snap to grid, copilot ux improvements, billing line items 2025-12-20 17:24:38 -08:00
Waleed
3e697d9ed9 v0.5.37: redaction utils consolidation, logs updates, autoconnect improvements, additional kb tag types 2025-12-19 22:31:55 -08:00
Martin Yankov
4431a1a484 fix(helm): add custom egress rules to realtime network policy (#2481)
The realtime service network policy was missing the custom egress rules section
that allows configuration of additional egress rules via values.yaml. This caused
the realtime pods to be unable to connect to external databases (e.g., PostgreSQL
on port 5432) when using external database configurations.

The app network policy already had this section, but the realtime network policy
was missing it, creating an inconsistency and preventing the realtime service
from accessing external databases configured via networkPolicy.egress values.

This fix adds the same custom egress rules template section to the realtime
network policy, matching the app network policy behavior and allowing users to
configure database connectivity via values.yaml.
2025-12-19 18:59:08 -08:00
Waleed
4d1a9a3f22 v0.5.36: hitl improvements, opengraph, slack fixes, one-click unsubscribe, auth checks, new db indexes 2025-12-19 01:27:49 -08:00
Vikhyath Mondreti
eb07a080fb v0.5.35: helm updates, copilot improvements, 404 for docs, salesforce fixes, subflow resize clamping 2025-12-18 16:23:19 -08:00
33 changed files with 972 additions and 2293 deletions

View File

@@ -4,22 +4,22 @@ import { auth } from '@/lib/auth'
import { isAuthDisabled } from '@/lib/core/config/feature-flags'
export async function POST() {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
try {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
const hdrs = await headers()
const response = await auth.api.generateOneTimeToken({
headers: hdrs,
})
if (!response?.token) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
if (!response) {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
}
return NextResponse.json({ token: response.token })
} catch {
} catch (error) {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
}
}

View File

@@ -97,10 +97,7 @@ export async function POST(
const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-reverted`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId: id, timestamp: Date.now() }),
})
} catch (e) {

View File

@@ -361,10 +361,7 @@ export async function DELETE(
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const socketResponse = await fetch(`${socketUrl}/api/workflow-deleted`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId }),
})

View File

@@ -254,10 +254,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId }),
})

View File

@@ -17,19 +17,6 @@ import { getEnv } from '@/lib/core/config/env'
const logger = createLogger('SocketContext')
const TAB_SESSION_ID_KEY = 'sim_tab_session_id'
function getTabSessionId(): string {
if (typeof window === 'undefined') return ''
let tabSessionId = sessionStorage.getItem(TAB_SESSION_ID_KEY)
if (!tabSessionId) {
tabSessionId = crypto.randomUUID()
sessionStorage.setItem(TAB_SESSION_ID_KEY, tabSessionId)
}
return tabSessionId
}
interface User {
id: string
name?: string
@@ -49,13 +36,11 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
presenceUsers: PresenceUser[]
joinWorkflow: (workflowId: string) => void
leaveWorkflow: () => void
retryConnection: () => void
emitWorkflowOperation: (
operation: string,
target: string,
@@ -78,6 +63,8 @@ interface SocketContextType {
onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
onUserJoined: (handler: (data: any) => void) => void
onUserLeft: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
@@ -88,13 +75,11 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
presenceUsers: [],
joinWorkflow: () => {},
leaveWorkflow: () => {},
retryConnection: () => {},
emitWorkflowOperation: () => {},
emitSubblockUpdate: () => {},
emitVariableUpdate: () => {},
@@ -105,6 +90,8 @@ const SocketContext = createContext<SocketContextType>({
onVariableUpdate: () => {},
onCursorUpdate: () => {},
onSelectionUpdate: () => {},
onUserJoined: () => {},
onUserLeft: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onOperationConfirmed: () => {},
@@ -125,43 +112,33 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
const urlWorkflowIdRef = useRef(urlWorkflowId)
urlWorkflowIdRef.current = urlWorkflowId
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
variableUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
userJoined?: (data: any) => void
userLeft?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const isRejoiningRef = useRef<boolean>(false)
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const generateSocketToken = async (): Promise<string> => {
const res = await fetch('/api/auth/socket-token', {
method: 'POST',
credentials: 'include',
headers: { 'cache-control': 'no-store' },
})
if (!res.ok) {
if (res.status === 401) {
throw new Error('Authentication required')
}
throw new Error('Failed to generate socket token')
}
if (!res.ok) throw new Error('Failed to generate socket token')
const body = await res.json().catch(() => ({}))
const token = body?.token
if (!token || typeof token !== 'string') throw new Error('Invalid socket token')
@@ -171,11 +148,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
useEffect(() => {
if (!user?.id) return
if (authFailed) {
logger.info('Socket initialization skipped - auth failed, waiting for retry')
return
}
if (initializedRef.current || socket || isConnecting) {
logger.info('Socket already exists or is connecting, skipping initialization')
return
@@ -208,11 +180,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
cb({ token: freshToken })
} catch (error) {
logger.error('Failed to generate fresh token for connection:', error)
if (error instanceof Error && error.message === 'Authentication required') {
// True auth failure - pass null token, server will reject with "Authentication required"
cb({ token: null })
}
// For server errors, don't call cb - connection will timeout and Socket.IO will retry
cb({ token: null })
}
},
})
@@ -226,19 +194,26 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
connected: socketInstance.connected,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
if (urlWorkflowId) {
logger.info(`Joining workflow room after connection: ${urlWorkflowId}`)
socketInstance.emit('join-workflow', {
workflowId: urlWorkflowId,
})
setCurrentWorkflowId(urlWorkflowId)
}
})
socketInstance.on('disconnect', (reason) => {
setIsConnected(false)
setIsConnecting(false)
setCurrentSocketId(null)
setCurrentWorkflowId(null)
setPresenceUsers([])
logger.info('Socket disconnected', {
reason,
})
setPresenceUsers([])
})
socketInstance.on('connect_error', (error: any) => {
@@ -251,34 +226,24 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
transport: error.transport,
})
// Check if this is an authentication failure
const isAuthError =
if (
error.message?.includes('Token validation failed') ||
error.message?.includes('Authentication failed') ||
error.message?.includes('Authentication required')
if (isAuthError) {
) {
logger.warn(
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
'Authentication failed - this could indicate session expiry or token generation issues'
)
// Stop reconnection attempts to prevent infinite loop
socketInstance.disconnect()
// Reset state to allow re-initialization when session is restored
setSocket(null)
setAuthFailed(true)
initializedRef.current = false
}
})
socketInstance.on('reconnect', (attemptNumber) => {
setIsConnected(true)
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket reconnected successfully', {
attemptNumber,
socketId: socketInstance.id,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('reconnect_attempt', (attemptNumber) => {
@@ -319,26 +284,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
// Handle join workflow success - confirms room membership with presence list
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
setCurrentWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
presenceCount: presenceUsers?.length || 0,
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
})
socketInstance.on('workflow-operation', (data) => {
eventHandlers.current.workflowOperation?.(data)
})
@@ -353,13 +298,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('workflow-deleted', (data) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
setCurrentWorkflowId((current) => {
if (current === data.workflowId) {
setPresenceUsers([])
return null
}
return current
})
if (currentWorkflowId === data.workflowId) {
setCurrentWorkflowId(null)
setPresenceUsers([])
}
eventHandlers.current.workflowDeleted?.(data)
})
@@ -502,35 +444,25 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('operation-forbidden', (error) => {
logger.warn('Operation forbidden:', error)
})
if (error?.type === 'SESSION_ERROR') {
const workflowId = urlWorkflowIdRef.current
if (workflowId && !isRejoiningRef.current) {
isRejoiningRef.current = true
logger.info(`Session expired, rejoining workflow: ${workflowId}`)
socketInstance.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
}
}
socketInstance.on('operation-confirmed', (data) => {
logger.debug('Operation confirmed:', data)
})
socketInstance.on('workflow-state', async (workflowData) => {
logger.info('Received workflow state from server')
if (workflowData?.state) {
try {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
} catch (error) {
logger.error('Error rehydrating workflow state:', error)
}
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
}
})
socketRef.current = socketInstance
setSocket(socketInstance)
return () => {
socketInstance.close()
}
} catch (error) {
logger.error('Failed to initialize socket with token:', error)
setIsConnecting(false)
@@ -545,20 +477,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
positionUpdateTimeouts.current.clear()
pendingPositionUpdates.current.clear()
// Close socket on unmount
if (socketRef.current) {
logger.info('Closing socket connection on unmount')
socketRef.current.close()
socketRef.current = null
}
}
}, [user?.id, authFailed])
}, [user?.id])
useEffect(() => {
if (!socket || !isConnected || !urlWorkflowId) return
// Skip if already in the correct room
if (currentWorkflowId === urlWorkflowId) return
logger.info(
@@ -573,10 +497,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow room: ${urlWorkflowId}`)
socket.emit('join-workflow', {
workflowId: urlWorkflowId,
tabSessionId: getTabSessionId(),
})
setCurrentWorkflowId(urlWorkflowId)
}, [socket, isConnected, urlWorkflowId, currentWorkflowId])
useEffect(() => {
return () => {
if (socket) {
logger.info('Cleaning up socket connection on unmount')
socket.disconnect()
}
}
}, [])
const joinWorkflow = useCallback(
(workflowId: string) => {
if (!socket || !user?.id) {
@@ -597,9 +530,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow: ${workflowId}`)
socket.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
// currentWorkflowId will be set by join-workflow-success handler
setCurrentWorkflowId(workflowId)
},
[socket, user, currentWorkflowId]
)
@@ -607,13 +539,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const leaveWorkflow = useCallback(() => {
if (socket && currentWorkflowId) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
import('@/stores/operation-queue/store')
.then(({ useOperationQueueStore }) => {
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
})
.catch((error) => {
logger.warn('Failed to cancel operations for workflow:', error)
})
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
socket.emit('leave-workflow')
setCurrentWorkflowId(null)
setPresenceUsers([])
@@ -626,20 +555,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
}, [socket, currentWorkflowId])
/**
* Retry socket connection after auth failure.
* Call this when user has re-authenticated (e.g., after login redirect).
*/
const retryConnection = useCallback(() => {
if (!authFailed) {
logger.info('retryConnection called but no auth failure - ignoring')
return
}
logger.info('Retrying socket connection after auth failure')
setAuthFailed(false)
// initializedRef.current was already reset in connect_error handler
// Effect will re-run and attempt connection
}, [authFailed])
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const emitWorkflowOperation = useCallback(
(operation: string, target: string, payload: any, operationId?: string) => {
@@ -799,6 +716,14 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.selectionUpdate = handler
}, [])
const onUserJoined = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userJoined = handler
}, [])
const onUserLeft = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userLeft = handler
}, [])
const onWorkflowDeleted = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowDeleted = handler
}, [])
@@ -820,13 +745,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -837,6 +760,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -846,13 +771,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -863,6 +786,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -2417,177 +2417,4 @@ describe('EdgeManager', () => {
expect(successReady).toContain(targetId)
})
})
describe('Condition with loop downstream - deactivation propagation', () => {
it('should deactivate nodes after loop when condition branch containing loop is deactivated', () => {
// Scenario: condition → (if) → sentinel_start → loopBody → sentinel_end → (loop_exit) → after_loop
// → (else) → other_branch
// When condition takes "else" path, the entire if-branch including nodes after the loop should be deactivated
const conditionId = 'condition'
const sentinelStartId = 'sentinel-start'
const loopBodyId = 'loop-body'
const sentinelEndId = 'sentinel-end'
const afterLoopId = 'after-loop'
const otherBranchId = 'other-branch'
const conditionNode = createMockNode(conditionId, [
{ target: sentinelStartId, sourceHandle: 'condition-if' },
{ target: otherBranchId, sourceHandle: 'condition-else' },
])
const sentinelStartNode = createMockNode(
sentinelStartId,
[{ target: loopBodyId }],
[conditionId]
)
const loopBodyNode = createMockNode(
loopBodyId,
[{ target: sentinelEndId }],
[sentinelStartId]
)
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: afterLoopId, sourceHandle: 'loop_exit' },
],
[loopBodyId]
)
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
const otherBranchNode = createMockNode(otherBranchId, [], [conditionId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[sentinelStartId, sentinelStartNode],
[loopBodyId, loopBodyNode],
[sentinelEndId, sentinelEndNode],
[afterLoopId, afterLoopNode],
[otherBranchId, otherBranchNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch, deactivating the "if" branch (which contains the loop)
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
// Only otherBranch should be ready
expect(readyNodes).toContain(otherBranchId)
expect(readyNodes).not.toContain(sentinelStartId)
// afterLoop should NOT be ready - its incoming edge from sentinel_end should be deactivated
expect(readyNodes).not.toContain(afterLoopId)
// Verify that countActiveIncomingEdges returns 0 for afterLoop
// (meaning the loop_exit edge was properly deactivated)
// Note: isNodeReady returns true when all edges are deactivated (no pending deps),
// but the node won't be in readyNodes since it wasn't reached via an active path
expect(edgeManager.isNodeReady(afterLoopNode)).toBe(true) // All edges deactivated = no blocking deps
})
it('should deactivate nodes after parallel when condition branch containing parallel is deactivated', () => {
// Similar scenario with parallel instead of loop
const conditionId = 'condition'
const parallelStartId = 'parallel-start'
const parallelBodyId = 'parallel-body'
const parallelEndId = 'parallel-end'
const afterParallelId = 'after-parallel'
const otherBranchId = 'other-branch'
const conditionNode = createMockNode(conditionId, [
{ target: parallelStartId, sourceHandle: 'condition-if' },
{ target: otherBranchId, sourceHandle: 'condition-else' },
])
const parallelStartNode = createMockNode(
parallelStartId,
[{ target: parallelBodyId }],
[conditionId]
)
const parallelBodyNode = createMockNode(
parallelBodyId,
[{ target: parallelEndId }],
[parallelStartId]
)
const parallelEndNode = createMockNode(
parallelEndId,
[{ target: afterParallelId, sourceHandle: 'parallel_exit' }],
[parallelBodyId]
)
const afterParallelNode = createMockNode(afterParallelId, [], [parallelEndId])
const otherBranchNode = createMockNode(otherBranchId, [], [conditionId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[parallelStartId, parallelStartNode],
[parallelBodyId, parallelBodyNode],
[parallelEndId, parallelEndNode],
[afterParallelId, afterParallelNode],
[otherBranchId, otherBranchNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
expect(readyNodes).toContain(otherBranchId)
expect(readyNodes).not.toContain(parallelStartId)
expect(readyNodes).not.toContain(afterParallelId)
// isNodeReady returns true when all edges are deactivated (no pending deps)
expect(edgeManager.isNodeReady(afterParallelNode)).toBe(true)
})
it('should still correctly handle normal loop exit (not deactivate when loop runs)', () => {
// When a loop actually executes and exits normally, after_loop should become ready
const sentinelStartId = 'sentinel-start'
const loopBodyId = 'loop-body'
const sentinelEndId = 'sentinel-end'
const afterLoopId = 'after-loop'
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: loopBodyId }])
const loopBodyNode = createMockNode(
loopBodyId,
[{ target: sentinelEndId }],
[sentinelStartId]
)
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: afterLoopId, sourceHandle: 'loop_exit' },
],
[loopBodyId]
)
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
const nodes = new Map<string, DAGNode>([
[sentinelStartId, sentinelStartNode],
[loopBodyId, loopBodyNode],
[sentinelEndId, sentinelEndNode],
[afterLoopId, afterLoopNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Simulate sentinel_end completing with loop_exit (loop is done)
const readyNodes = edgeManager.processOutgoingEdges(sentinelEndNode, {
selectedRoute: 'loop_exit',
})
// afterLoop should be ready
expect(readyNodes).toContain(afterLoopId)
})
})
})

View File

@@ -243,7 +243,7 @@ export class EdgeManager {
}
for (const [, outgoingEdge] of targetNode.outgoingEdges) {
if (!this.isBackwardsEdge(outgoingEdge.sourceHandle)) {
if (!this.isControlEdge(outgoingEdge.sourceHandle)) {
this.deactivateEdgeAndDescendants(
targetId,
outgoingEdge.target,

View File

@@ -119,6 +119,8 @@ export function useCollaborativeWorkflow() {
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -482,6 +484,14 @@ export function useCollaborativeWorkflow() {
}
}
const handleUserJoined = (data: any) => {
logger.info(`User joined: ${data.userName}`)
}
const handleUserLeft = (data: any) => {
logger.info(`User left: ${data.userId}`)
}
const handleWorkflowDeleted = (data: any) => {
const { workflowId } = data
logger.warn(`Workflow ${workflowId} has been deleted`)
@@ -590,17 +600,26 @@ export function useCollaborativeWorkflow() {
failOperation(operationId, retryable)
}
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onVariableUpdate(handleVariableUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
return () => {
// Cleanup handled by socket context
}
}, [
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -10,7 +10,6 @@ import {
type KnowledgeBaseArgs,
} from '@/lib/copilot/tools/shared/schemas'
import { useCopilotStore } from '@/stores/panel/copilot/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
/**
* Client tool for knowledge base operations
@@ -103,19 +102,7 @@ export class KnowledgeBaseClientTool extends BaseClientTool {
const logger = createLogger('KnowledgeBaseClientTool')
try {
this.setState(ClientToolCallState.executing)
// Get the workspace ID from the workflow registry hydration state
const { hydration } = useWorkflowRegistry.getState()
const workspaceId = hydration.workspaceId
// Build payload with workspace ID included in args
const payload: KnowledgeBaseArgs = {
...(args || { operation: 'list' }),
args: {
...(args?.args || {}),
workspaceId: workspaceId || undefined,
},
}
const payload: KnowledgeBaseArgs = { ...(args || { operation: 'list' }) }
const res = await fetch('/api/copilot/execute-copilot-server-tool', {
method: 'POST',

View File

@@ -2508,10 +2508,6 @@ async function validateWorkflowSelectorIds(
for (const subBlockConfig of blockConfig.subBlocks) {
if (!SELECTOR_TYPES.has(subBlockConfig.type)) continue
// Skip oauth-input - credentials are pre-validated before edit application
// This allows existing collaborator credentials to remain untouched
if (subBlockConfig.type === 'oauth-input') continue
const subBlockValue = blockData.subBlocks?.[subBlockConfig.id]?.value
if (!subBlockValue) continue
@@ -2577,295 +2573,6 @@ async function validateWorkflowSelectorIds(
return errors
}
/**
* Pre-validates credential and apiKey inputs in operations before they are applied.
* - Validates oauth-input (credential) IDs belong to the user
* - Filters out apiKey inputs for hosted models when isHosted is true
* - Also validates credentials and apiKeys in nestedNodes (blocks inside loop/parallel)
* Returns validation errors for any removed inputs.
*/
async function preValidateCredentialInputs(
operations: EditWorkflowOperation[],
context: { userId: string },
workflowState?: Record<string, unknown>
): Promise<{ filteredOperations: EditWorkflowOperation[]; errors: ValidationError[] }> {
const { isHosted } = await import('@/lib/core/config/feature-flags')
const { getHostedModels } = await import('@/providers/utils')
const logger = createLogger('PreValidateCredentials')
const errors: ValidationError[] = []
// Collect credential and apiKey inputs that need validation/filtering
const credentialInputs: Array<{
operationIndex: number
blockId: string
blockType: string
fieldName: string
value: string
nestedBlockId?: string
}> = []
const hostedApiKeyInputs: Array<{
operationIndex: number
blockId: string
blockType: string
model: string
nestedBlockId?: string
}> = []
const hostedModelsLower = isHosted ? new Set(getHostedModels().map((m) => m.toLowerCase())) : null
/**
* Collect credential inputs from a block's inputs based on its block config
*/
function collectCredentialInputs(
blockConfig: ReturnType<typeof getBlock>,
inputs: Record<string, unknown>,
opIndex: number,
blockId: string,
blockType: string,
nestedBlockId?: string
) {
if (!blockConfig) return
for (const subBlockConfig of blockConfig.subBlocks) {
if (subBlockConfig.type !== 'oauth-input') continue
const inputValue = inputs[subBlockConfig.id]
if (!inputValue || typeof inputValue !== 'string' || inputValue.trim() === '') continue
credentialInputs.push({
operationIndex: opIndex,
blockId,
blockType,
fieldName: subBlockConfig.id,
value: inputValue,
nestedBlockId,
})
}
}
/**
* Check if apiKey should be filtered for a block with the given model
*/
function collectHostedApiKeyInput(
inputs: Record<string, unknown>,
modelValue: string | undefined,
opIndex: number,
blockId: string,
blockType: string,
nestedBlockId?: string
) {
if (!hostedModelsLower || !inputs.apiKey) return
if (!modelValue || typeof modelValue !== 'string') return
if (hostedModelsLower.has(modelValue.toLowerCase())) {
hostedApiKeyInputs.push({
operationIndex: opIndex,
blockId,
blockType,
model: modelValue,
nestedBlockId,
})
}
}
operations.forEach((op, opIndex) => {
// Process main block inputs
if (op.params?.inputs && op.params?.type) {
const blockConfig = getBlock(op.params.type)
if (blockConfig) {
// Collect credentials from main block
collectCredentialInputs(
blockConfig,
op.params.inputs as Record<string, unknown>,
opIndex,
op.block_id,
op.params.type
)
// Check for apiKey inputs on hosted models
let modelValue = (op.params.inputs as Record<string, unknown>).model as string | undefined
// For edit operations, if model is not being changed, check existing block's model
if (
!modelValue &&
op.operation_type === 'edit' &&
(op.params.inputs as Record<string, unknown>).apiKey &&
workflowState
) {
const existingBlock = (workflowState.blocks as Record<string, unknown>)?.[op.block_id] as
| Record<string, unknown>
| undefined
const existingSubBlocks = existingBlock?.subBlocks as Record<string, unknown> | undefined
const existingModelSubBlock = existingSubBlocks?.model as
| Record<string, unknown>
| undefined
modelValue = existingModelSubBlock?.value as string | undefined
}
collectHostedApiKeyInput(
op.params.inputs as Record<string, unknown>,
modelValue,
opIndex,
op.block_id,
op.params.type
)
}
}
// Process nested nodes (blocks inside loop/parallel containers)
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
if (nestedNodes) {
Object.entries(nestedNodes).forEach(([childId, childBlock]) => {
const childType = childBlock.type as string | undefined
const childInputs = childBlock.inputs as Record<string, unknown> | undefined
if (!childType || !childInputs) return
const childBlockConfig = getBlock(childType)
if (!childBlockConfig) return
// Collect credentials from nested block
collectCredentialInputs(
childBlockConfig,
childInputs,
opIndex,
op.block_id,
childType,
childId
)
// Check for apiKey inputs on hosted models in nested block
const modelValue = childInputs.model as string | undefined
collectHostedApiKeyInput(childInputs, modelValue, opIndex, op.block_id, childType, childId)
})
}
})
const hasCredentialsToValidate = credentialInputs.length > 0
const hasHostedApiKeysToFilter = hostedApiKeyInputs.length > 0
if (!hasCredentialsToValidate && !hasHostedApiKeysToFilter) {
return { filteredOperations: operations, errors }
}
// Deep clone operations so we can modify them
const filteredOperations = structuredClone(operations)
// Filter out apiKey inputs for hosted models and add validation errors
if (hasHostedApiKeysToFilter) {
logger.info('Filtering apiKey inputs for hosted models', { count: hostedApiKeyInputs.length })
for (const apiKeyInput of hostedApiKeyInputs) {
const op = filteredOperations[apiKeyInput.operationIndex]
// Handle nested block apiKey filtering
if (apiKeyInput.nestedBlockId) {
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
const nestedBlock = nestedNodes?.[apiKeyInput.nestedBlockId]
const nestedInputs = nestedBlock?.inputs as Record<string, unknown> | undefined
if (nestedInputs?.apiKey) {
nestedInputs.apiKey = undefined
logger.debug('Filtered apiKey for hosted model in nested block', {
parentBlockId: apiKeyInput.blockId,
nestedBlockId: apiKeyInput.nestedBlockId,
model: apiKeyInput.model,
})
errors.push({
blockId: apiKeyInput.nestedBlockId,
blockType: apiKeyInput.blockType,
field: 'apiKey',
value: '[redacted]',
error: `Cannot set API key for hosted model "${apiKeyInput.model}" - API keys are managed by the platform when using hosted models`,
})
}
} else if (op.params?.inputs?.apiKey) {
// Handle main block apiKey filtering
op.params.inputs.apiKey = undefined
logger.debug('Filtered apiKey for hosted model', {
blockId: apiKeyInput.blockId,
model: apiKeyInput.model,
})
errors.push({
blockId: apiKeyInput.blockId,
blockType: apiKeyInput.blockType,
field: 'apiKey',
value: '[redacted]',
error: `Cannot set API key for hosted model "${apiKeyInput.model}" - API keys are managed by the platform when using hosted models`,
})
}
}
}
// Validate credential inputs
if (hasCredentialsToValidate) {
logger.info('Pre-validating credential inputs', {
credentialCount: credentialInputs.length,
userId: context.userId,
})
const allCredentialIds = credentialInputs.map((c) => c.value)
const validationResult = await validateSelectorIds('oauth-input', allCredentialIds, context)
const invalidSet = new Set(validationResult.invalid)
if (invalidSet.size > 0) {
for (const credInput of credentialInputs) {
if (!invalidSet.has(credInput.value)) continue
const op = filteredOperations[credInput.operationIndex]
// Handle nested block credential removal
if (credInput.nestedBlockId) {
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
const nestedBlock = nestedNodes?.[credInput.nestedBlockId]
const nestedInputs = nestedBlock?.inputs as Record<string, unknown> | undefined
if (nestedInputs?.[credInput.fieldName]) {
delete nestedInputs[credInput.fieldName]
logger.info('Removed invalid credential from nested block', {
parentBlockId: credInput.blockId,
nestedBlockId: credInput.nestedBlockId,
field: credInput.fieldName,
invalidValue: credInput.value,
})
}
} else if (op.params?.inputs?.[credInput.fieldName]) {
// Handle main block credential removal
delete op.params.inputs[credInput.fieldName]
logger.info('Removed invalid credential from operation', {
blockId: credInput.blockId,
field: credInput.fieldName,
invalidValue: credInput.value,
})
}
const warningInfo = validationResult.warning ? `. ${validationResult.warning}` : ''
const errorBlockId = credInput.nestedBlockId ?? credInput.blockId
errors.push({
blockId: errorBlockId,
blockType: credInput.blockType,
field: credInput.fieldName,
value: credInput.value,
error: `Invalid credential ID "${credInput.value}" - credential does not exist or user doesn't have access${warningInfo}`,
})
}
logger.warn('Filtered out invalid credentials', {
invalidCount: invalidSet.size,
})
}
}
return { filteredOperations, errors }
}
async function getCurrentWorkflowStateFromDb(
workflowId: string
): Promise<{ workflowState: any; subBlockValues: Record<string, Record<string, any>> }> {
@@ -2950,29 +2657,12 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
// Get permission config for the user
const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null
// Pre-validate credential and apiKey inputs before applying operations
// This filters out invalid credentials and apiKeys for hosted models
let operationsToApply = operations
const credentialErrors: ValidationError[] = []
if (context?.userId) {
const { filteredOperations, errors: credErrors } = await preValidateCredentialInputs(
operations,
{ userId: context.userId },
workflowState
)
operationsToApply = filteredOperations
credentialErrors.push(...credErrors)
}
// Apply operations directly to the workflow state
const {
state: modifiedWorkflowState,
validationErrors,
skippedItems,
} = applyOperationsToWorkflowState(workflowState, operationsToApply, permissionConfig)
// Add credential validation errors
validationErrors.push(...credentialErrors)
} = applyOperationsToWorkflowState(workflowState, operations, permissionConfig)
// Get workspaceId for selector validation
let workspaceId: string | undefined

View File

@@ -74,7 +74,6 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -145,7 +144,6 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",

View File

@@ -1,7 +1,5 @@
import type { Server as HttpServer } from 'http'
import { createLogger } from '@sim/logger'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient, type RedisClientType } from 'redis'
import { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { isProd } from '@/lib/core/config/feature-flags'
@@ -9,16 +7,9 @@ import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('SocketIOConfig')
/** Socket.IO ping timeout - how long to wait for pong before considering connection dead */
const PING_TIMEOUT_MS = 60000
/** Socket.IO ping interval - how often to send ping packets */
const PING_INTERVAL_MS = 25000
/** Maximum HTTP buffer size for Socket.IO messages */
const MAX_HTTP_BUFFER_SIZE = 1e6
let adapterPubClient: RedisClientType | null = null
let adapterSubClient: RedisClientType | null = null
/**
* Get allowed origins for Socket.IO CORS configuration
*/
function getAllowedOrigins(): string[] {
const allowedOrigins = [
getBaseUrl(),
@@ -33,10 +24,11 @@ function getAllowedOrigins(): string[] {
}
/**
* Create and configure a Socket.IO server instance.
* If REDIS_URL is configured, adds Redis adapter for cross-pod broadcasting.
* Create and configure a Socket.IO server instance
* @param httpServer - The HTTP server instance to attach Socket.IO to
* @returns Configured Socket.IO server instance
*/
export async function createSocketIOServer(httpServer: HttpServer): Promise<Server> {
export function createSocketIOServer(httpServer: HttpServer): Server {
const allowedOrigins = getAllowedOrigins()
const io = new Server(httpServer, {
@@ -44,110 +36,31 @@ export async function createSocketIOServer(httpServer: HttpServer): Promise<Serv
origin: allowedOrigins,
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'Cookie', 'socket.io'],
credentials: true,
credentials: true, // Enable credentials to accept cookies
},
transports: ['websocket', 'polling'],
allowEIO3: true,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
transports: ['websocket', 'polling'], // WebSocket first, polling as fallback
allowEIO3: true, // Keep legacy support for compatibility
pingTimeout: 60000, // Back to original conservative setting
pingInterval: 25000, // Back to original interval
maxHttpBufferSize: 1e6,
cookie: {
name: 'io',
path: '/',
httpOnly: true,
sameSite: 'none',
secure: isProd,
sameSite: 'none', // Required for cross-origin cookies
secure: isProd, // HTTPS in production
},
})
if (env.REDIS_URL) {
logger.info('Configuring Socket.IO Redis adapter...')
const redisOptions = {
url: env.REDIS_URL,
socket: {
reconnectStrategy: (retries: number) => {
if (retries > 10) {
logger.error('Redis adapter reconnection failed after 10 attempts')
return new Error('Redis adapter reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis adapter reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
}
// Create separate clients for pub and sub (recommended for reliability)
adapterPubClient = createClient(redisOptions)
adapterSubClient = createClient(redisOptions)
adapterPubClient.on('error', (err) => {
logger.error('Redis adapter pub client error:', err)
})
adapterSubClient.on('error', (err) => {
logger.error('Redis adapter sub client error:', err)
})
adapterPubClient.on('ready', () => {
logger.info('Redis adapter pub client ready')
})
adapterSubClient.on('ready', () => {
logger.info('Redis adapter sub client ready')
})
await Promise.all([adapterPubClient.connect(), adapterSubClient.connect()])
io.adapter(createAdapter(adapterPubClient, adapterSubClient))
logger.info('Socket.IO Redis adapter connected - cross-pod broadcasting enabled')
} else {
logger.warn('REDIS_URL not configured - running in single-pod mode')
}
logger.info('Socket.IO server configured with:', {
allowedOrigins: allowedOrigins.length,
transports: ['websocket', 'polling'],
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
pingTimeout: 60000,
pingInterval: 25000,
maxHttpBufferSize: 1e6,
cookieSecure: isProd,
corsCredentials: true,
redisAdapter: !!env.REDIS_URL,
})
return io
}
/**
* Clean up Redis adapter connections.
* Call this during graceful shutdown.
*/
export async function shutdownSocketIOAdapter(): Promise<void> {
const closePromises: Promise<void>[] = []
if (adapterPubClient) {
closePromises.push(
adapterPubClient.quit().then(() => {
logger.info('Redis adapter pub client closed')
adapterPubClient = null
})
)
}
if (adapterSubClient) {
closePromises.push(
adapterSubClient.quit().then(() => {
logger.info('Redis adapter sub client closed')
adapterSubClient = null
})
)
}
if (closePromises.length > 0) {
await Promise.all(closePromises)
logger.info('Socket.IO Redis adapter shutdown complete')
}
}

View File

@@ -1,12 +1,17 @@
import { createLogger } from '@sim/logger'
import { cleanupPendingSubblocksForSocket } from '@/socket/handlers/subblocks'
import { cleanupPendingVariablesForSocket } from '@/socket/handlers/variables'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('ConnectionHandlers')
export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupConnectionHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('error', (error) => {
logger.error(`Socket ${socket.id} error:`, error)
})
@@ -15,22 +20,13 @@ export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager
logger.error(`Socket ${socket.id} connection error:`, error)
})
socket.on('disconnect', async (reason) => {
try {
// Clean up pending debounce entries for this socket to prevent memory leaks
cleanupPendingSubblocksForSocket(socket.id)
cleanupPendingVariablesForSocket(socket.id)
socket.on('disconnect', (reason) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
const workflowId = await roomManager.removeUserFromRoom(socket.id)
if (workflowId) {
await roomManager.broadcastPresenceUpdate(workflowId)
logger.info(
`Socket ${socket.id} disconnected from workflow ${workflowId} (reason: ${reason})`
)
}
} catch (error) {
logger.error(`Error handling disconnect for socket ${socket.id}:`, error)
if (workflowId && session) {
roomManager.cleanupUserFromRoom(socket.id, workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
}
})
}

View File

@@ -5,9 +5,16 @@ import { setupSubblocksHandlers } from '@/socket/handlers/subblocks'
import { setupVariablesHandlers } from '@/socket/handlers/variables'
import { setupWorkflowHandlers } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export type { UserPresence, WorkflowRoom }
/**
* Sets up all socket event handlers for an authenticated socket connection
* @param socket - The authenticated socket instance
* @param roomManager - Room manager instance for state management
*/
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomManager) {
setupWorkflowHandlers(socket, roomManager)
setupOperationsHandlers(socket, roomManager)
setupSubblocksHandlers(socket, roomManager)
@@ -15,3 +22,12 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoom
setupPresenceHandlers(socket, roomManager)
setupConnectionHandlers(socket, roomManager)
}
export {
setupWorkflowHandlers,
setupOperationsHandlers,
setupSubblocksHandlers,
setupVariablesHandlers,
setupPresenceHandlers,
setupConnectionHandlers,
}

View File

@@ -10,41 +10,38 @@ import {
WORKFLOW_OPERATIONS,
} from '@/socket/constants'
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupOperationsHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
socket.emit('error', {
type: 'NOT_JOINED',
message: 'Not joined to any workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
socket.emit('operation-forbidden', {
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
socket.emit('error', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return
}
@@ -63,18 +60,16 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
isPositionUpdate && 'commit' in payload ? payload.commit === true : false
const operationTimestamp = isPositionUpdate ? timestamp : Date.now()
// Get user presence for permission checking
const users = await roomManager.getWorkflowUsers(workflowId)
const userPresence = users.find((u) => u.socketId === socket.id)
// Skip permission checks for non-committed position updates (broadcasts only, no persistence)
if (isPositionUpdate && !commitPositionUpdate) {
// Update last activity
const userPresence = room.users.get(socket.id)
if (userPresence) {
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
userPresence.lastActivity = Date.now()
}
} else {
// Check permissions from cached role for all other operations
const userPresence = room.users.get(socket.id)
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
@@ -83,13 +78,10 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
return
}
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
userPresence.lastActivity = Date.now()
// Check permissions using cached role (no DB query)
const permissionCheck = checkRolePermission(userPresence.role, operation)
@@ -140,7 +132,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
timestamp: operationTimestamp,
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
if (operationId) {
socket.emit('operation-confirmed', {
@@ -186,7 +178,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
timestamp: operationTimestamp,
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
if (operationId) {
socket.emit('operation-confirmed', { operationId, serverTimestamp: Date.now() })
@@ -219,7 +211,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
@@ -259,7 +251,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
@@ -296,7 +288,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -328,7 +320,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -357,7 +349,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -389,7 +381,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -421,7 +413,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -453,7 +445,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -482,7 +474,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -511,24 +503,27 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
userId: session.userId,
})
await roomManager.updateRoomLastModified(workflowId)
room.lastModified = Date.now()
const broadcastData = {
operation,
target,
payload,
timestamp: operationTimestamp,
timestamp: operationTimestamp, // Preserve client timestamp for position updates
senderId: socket.id,
userId: session.userId,
userName: session.userName,
// Add operation metadata for better client handling
metadata: {
workflowId,
operationId: crypto.randomUUID(),
isPositionUpdate, // Flag to help clients handle position updates specially
},
}
socket.to(workflowId).emit('workflow-operation', broadcastData)
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
@@ -538,14 +533,16 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: !(error instanceof ZodError),
retryable: !(error instanceof ZodError), // Don't retry validation errors
})
}
// Also emit legacy operation-error for backward compatibility
if (error instanceof ZodError) {
socket.emit('operation-error', {
type: 'VALIDATION_ERROR',
@@ -556,6 +553,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
})
logger.warn(`Validation error for operation from ${session.userId}:`, error.errors)
} else if (error instanceof Error) {
// Handle specific database errors
if (error.message.includes('not found')) {
socket.emit('operation-error', {
type: 'RESOURCE_NOT_FOUND',

View File

@@ -1,53 +1,62 @@
import { createLogger } from '@sim/logger'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('PresenceHandlers')
export function setupPresenceHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('cursor-update', async ({ cursor }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
export function setupPresenceHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('cursor-update', ({ cursor }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
// Update cursor in room state
await roomManager.updateUserActivity(workflowId, socket.id, { cursor })
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Broadcast to other users in the room
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
} catch (error) {
logger.error(`Error handling cursor update for socket ${socket.id}:`, error)
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.cursor = cursor
userPresence.lastActivity = Date.now()
}
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
})
socket.on('selection-update', async ({ selection }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
// Handle user selection (for showing what block/element a user has selected)
socket.on('selection-update', ({ selection }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
// Update selection in room state
await roomManager.updateUserActivity(workflowId, socket.id, { selection })
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Broadcast to other users in the room
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
} catch (error) {
logger.error(`Error handling selection update for socket ${socket.id}:`, error)
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.selection = selection
userPresence.lastActivity = Date.now()
}
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
})
}

View File

@@ -2,14 +2,12 @@ import { db } from '@sim/db'
import { workflow, workflowBlocks } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('SubblocksHandlers')
/** Debounce interval for coalescing rapid subblock updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -20,61 +18,44 @@ type PendingSubblock = {
// Keyed by `${workflowId}:${blockId}:${subblockId}`
const pendingSubblockUpdates = new Map<string, PendingSubblock>()
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingSubblocksForSocket(socketId: string): void {
for (const [, pending] of pendingSubblockUpdates.entries()) {
// Remove this socket's operation entries
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
// If no more operations are waiting, the timeout will still fire and flush
// This is fine - the update will still persist, just no confirmation to send
}
}
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
export function setupSubblocksHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('subblock-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { blockId, subblockId, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
// Server-side debounce/coalesce by workflowId+blockId+subblockId
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
const existing = pendingSubblockUpdates.get(debouncedKey)
@@ -85,7 +66,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, DEBOUNCE_INTERVAL_MS)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -95,7 +76,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
}, DEBOUNCE_INTERVAL_MS)
}, 25)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,
@@ -107,6 +88,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Best-effort failure for the single operation if provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
@@ -115,6 +97,7 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
})
}
// Also emit legacy operation-error for backward compatibility
socket.emit('operation-error', {
type: 'SUBBLOCK_UPDATE_FAILED',
message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`,
@@ -128,11 +111,9 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
async function flushSubblockUpdate(
workflowId: string,
pending: PendingSubblock,
roomManager: IRoomManager
roomManager: RoomManager
) {
const { blockId, subblockId, value, timestamp } = pending.latest
const io = roomManager.io
try {
// Verify workflow still exists
const workflowExists = await db
@@ -143,11 +124,14 @@ async function flushSubblockUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
@@ -180,48 +164,60 @@ async function flushSubblockUpdate(
})
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
} else {
io.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
// Get all sockets in the room
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
// Only emit to sockets that didn't send any of the coalesced ops
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
}
})
}
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
// Confirm all coalesced operationIds
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
} else {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing subblock update:', error)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -2,14 +2,12 @@ import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
const logger = createLogger('VariablesHandlers')
/** Debounce interval for coalescing rapid variable updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingVariable = {
latest: { variableId: string; field: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -19,58 +17,45 @@ type PendingVariable = {
// Keyed by `${workflowId}:${variableId}:${field}`
const pendingVariableUpdates = new Map<string, PendingVariable>()
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingVariablesForSocket(socketId: string): void {
for (const [, pending] of pendingVariableUpdates.entries()) {
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
}
}
export function setupVariablesHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { variableId, field, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
const debouncedKey = `${workflowId}:${variableId}:${field}`
const existing = pendingVariableUpdates.get(debouncedKey)
if (existing) {
@@ -80,7 +65,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
existing.timeout = setTimeout(async () => {
await flushVariableUpdate(workflowId, existing, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}, DEBOUNCE_INTERVAL_MS)
}, 25)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -90,7 +75,7 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
await flushVariableUpdate(workflowId, pending, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}
}, DEBOUNCE_INTERVAL_MS)
}, 25)
pendingVariableUpdates.set(debouncedKey, {
latest: { variableId, field, value, timestamp },
timeout,
@@ -123,11 +108,9 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
async function flushVariableUpdate(
workflowId: string,
pending: PendingVariable,
roomManager: IRoomManager
roomManager: RoomManager
) {
const { variableId, field, value, timestamp } = pending.latest
const io = roomManager.io
try {
const workflowExists = await db
.select({ id: workflow.id })
@@ -137,11 +120,14 @@ async function flushVariableUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
})
return
}
@@ -177,50 +163,59 @@ async function flushVariableUpdate(
})
if (updateSuccessful) {
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
} else {
io.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('variable-update', {
variableId,
field,
value,
timestamp,
})
}
}
})
}
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
})
logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
}
})
}
} catch (error) {
logger.error('Error flushing variable update:', error)
pending.opToSocket.forEach((socketId, opId) => {
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
})
}
}

View File

@@ -4,12 +4,38 @@ import { eq } from 'drizzle-orm'
import { getWorkflowState } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { verifyWorkflowAccess } from '@/socket/middleware/permissions'
import type { IRoomManager, UserPresence } from '@/socket/rooms'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
const logger = createLogger('WorkflowHandlers')
export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('join-workflow', async ({ workflowId, tabSessionId }) => {
export type { UserPresence, WorkflowRoom }
export interface HandlerDependencies {
roomManager: RoomManager
}
export const createWorkflowRoom = (workflowId: string): WorkflowRoom => ({
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
export const cleanupUserFromRoom = (
socketId: string,
workflowId: string,
roomManager: RoomManager
) => {
roomManager.cleanupUserFromRoom(socketId, workflowId)
}
export function setupWorkflowHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('join-workflow', async ({ workflowId }) => {
try {
const userId = socket.userId
const userName = socket.userName
@@ -22,7 +48,6 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access
let userRole: string
try {
const accessInfo = await verifyWorkflowAccess(userId, workflowId)
@@ -38,37 +63,23 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return
}
// Leave current room if in one
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const currentWorkflowId = roomManager.getWorkflowIdForSocket(socket.id)
if (currentWorkflowId) {
socket.leave(currentWorkflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
roomManager.cleanupUserFromRoom(socket.id, currentWorkflowId)
roomManager.broadcastPresenceUpdate(currentWorkflowId)
}
const STALE_THRESHOLD_MS = 60_000
const now = Date.now()
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
for (const existingUser of existingUsers) {
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
const isStale =
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
if (isSameTab || isStale) {
logger.info(
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
)
await roomManager.removeUserFromRoom(existingUser.socketId)
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
}
}
}
// Join the new room
socket.join(workflowId)
// Get avatar URL
if (!roomManager.hasWorkflowRoom(workflowId)) {
roomManager.setWorkflowRoom(workflowId, roomManager.createWorkflowRoom(workflowId))
}
const room = roomManager.getWorkflowRoom(workflowId)!
room.activeConnections++
let avatarUrl = socket.userImage || null
if (!avatarUrl) {
try {
@@ -84,68 +95,54 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
}
}
// Create presence entry
const userPresence: UserPresence = {
userId,
workflowId,
userName,
socketId: socket.id,
tabSessionId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: userRole,
avatarUrl,
}
// Add user to room
await roomManager.addUserToRoom(workflowId, socket.id, userPresence)
// Get current presence list for the join acknowledgment
const presenceUsers = await roomManager.getWorkflowUsers(workflowId)
// Get workflow state
const workflowState = await getWorkflowState(workflowId)
// Send join success with presence list (client waits for this to confirm join)
socket.emit('join-workflow-success', {
workflowId,
socketId: socket.id,
presenceUsers,
room.users.set(socket.id, userPresence)
roomManager.setWorkflowForSocket(socket.id, workflowId)
roomManager.setUserSession(socket.id, {
userId,
userName,
avatarUrl,
})
// Send workflow state
const workflowState = await getWorkflowState(workflowId)
socket.emit('workflow-state', workflowState)
// Broadcast presence update to all users in the room
await roomManager.broadcastPresenceUpdate(workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
const uniqueUserCount = await roomManager.getUniqueUserCount(workflowId)
const uniqueUserCount = roomManager.getUniqueUserCount(workflowId)
logger.info(
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users.`
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users (${room.activeConnections} connections).`
)
} catch (error) {
logger.error('Error joining workflow:', error)
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
socket.emit('error', {
type: 'JOIN_ERROR',
message: 'Failed to join workflow',
})
}
})
socket.on('leave-workflow', async () => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
socket.on('leave-workflow', () => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (workflowId && session) {
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(workflowId)
if (workflowId && session) {
socket.leave(workflowId)
roomManager.cleanupUserFromRoom(socket.id, workflowId)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
}
} catch (error) {
logger.error('Error leaving workflow:', error)
roomManager.broadcastPresenceUpdate(workflowId)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
}
})
}

View File

@@ -7,7 +7,7 @@ import { createServer, request as httpRequest } from 'http'
import { createMockLogger, databaseMock } from '@sim/testing'
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { createSocketIOServer } from '@/socket/config/socket'
import { MemoryRoomManager } from '@/socket/rooms'
import { RoomManager } from '@/socket/rooms/manager'
import { createHttpHandler } from '@/socket/routes/http'
vi.mock('@/lib/auth', () => ({
@@ -20,30 +20,6 @@ vi.mock('@/lib/auth', () => ({
vi.mock('@sim/db', () => databaseMock)
// Mock redis package to prevent actual Redis connections
vi.mock('redis', () => ({
createClient: vi.fn(() => ({
on: vi.fn(),
connect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue(undefined),
duplicate: vi.fn().mockReturnThis(),
})),
}))
// Mock env to not have REDIS_URL (use importOriginal to get helper functions)
vi.mock('@/lib/core/config/env', async (importOriginal) => {
const actual = await importOriginal<typeof import('@/lib/core/config/env')>()
return {
...actual,
env: {
...actual.env,
DATABASE_URL: 'postgres://localhost/test',
NODE_ENV: 'test',
REDIS_URL: undefined,
},
}
})
vi.mock('@/socket/middleware/auth', () => ({
authenticateSocket: vi.fn((socket, next) => {
socket.userId = 'test-user-id'
@@ -75,7 +51,7 @@ vi.mock('@/socket/database/operations', () => ({
describe('Socket Server Index Integration', () => {
let httpServer: any
let io: any
let roomManager: MemoryRoomManager
let roomManager: RoomManager
let logger: ReturnType<typeof createMockLogger>
let PORT: number
@@ -88,10 +64,9 @@ describe('Socket Server Index Integration', () => {
httpServer = createServer()
io = await createSocketIOServer(httpServer)
io = createSocketIOServer(httpServer)
roomManager = new MemoryRoomManager(io)
await roomManager.initialize()
roomManager = new RoomManager(io)
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
@@ -123,9 +98,6 @@ describe('Socket Server Index Integration', () => {
}, 20000)
afterEach(async () => {
if (roomManager) {
await roomManager.shutdown()
}
if (io) {
await new Promise<void>((resolve) => {
io.close(() => resolve())
@@ -205,60 +177,43 @@ describe('Socket Server Index Integration', () => {
})
describe('Room Manager Integration', () => {
it('should create room manager successfully', async () => {
it('should create room manager successfully', () => {
expect(roomManager).toBeDefined()
expect(await roomManager.getTotalActiveConnections()).toBe(0)
expect(roomManager.getTotalActiveConnections()).toBe(0)
})
it('should add and get users from workflow rooms', async () => {
it('should create workflow rooms', () => {
const workflowId = 'test-workflow-123'
const socketId = 'test-socket-123'
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const users = await roomManager.getWorkflowUsers(workflowId)
expect(users).toHaveLength(1)
expect(users[0].socketId).toBe(socketId)
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const retrievedRoom = roomManager.getWorkflowRoom(workflowId)
expect(retrievedRoom).toBeDefined()
expect(retrievedRoom?.workflowId).toBe(workflowId)
})
it('should manage user sessions', async () => {
it('should manage user sessions', () => {
const socketId = 'test-socket-123'
const workflowId = 'test-workflow-456'
const session = { userId: 'user-123', userName: 'Test User' }
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
roomManager.setWorkflowForSocket(socketId, workflowId)
roomManager.setUserSession(socketId, session)
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
const session = await roomManager.getUserSession(socketId)
expect(session).toBeDefined()
expect(session?.userId).toBe('user-123')
expect(roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
expect(roomManager.getUserSession(socketId)).toEqual(session)
})
it('should clean up rooms properly', async () => {
it('should clean up rooms properly', () => {
const workflowId = 'test-workflow-789'
const socketId = 'test-socket-789'
const presence = {
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
// Add user to room
room.users.set(socketId, {
userId: 'user-789',
workflowId,
userName: 'Test User',
@@ -266,18 +221,16 @@ describe('Socket Server Index Integration', () => {
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
})
room.activeConnections = 1
await roomManager.addUserToRoom(workflowId, socketId, presence)
roomManager.setWorkflowForSocket(socketId, workflowId)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
// Clean up user
roomManager.cleanupUserFromRoom(socketId, workflowId)
// Remove user
await roomManager.removeUserFromRoom(socketId)
// Room should be cleaned up since it's now empty
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBeNull()
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(roomManager.getWorkflowIdForSocket(socketId)).toBeUndefined()
})
})
@@ -285,7 +238,7 @@ describe('Socket Server Index Integration', () => {
it.concurrent('should properly import all extracted modules', async () => {
const { createSocketIOServer } = await import('@/socket/config/socket')
const { createHttpHandler } = await import('@/socket/routes/http')
const { MemoryRoomManager, RedisRoomManager } = await import('@/socket/rooms')
const { RoomManager } = await import('@/socket/rooms/manager')
const { authenticateSocket } = await import('@/socket/middleware/auth')
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const { getWorkflowState } = await import('@/socket/database/operations')
@@ -293,23 +246,22 @@ describe('Socket Server Index Integration', () => {
expect(createSocketIOServer).toBeTypeOf('function')
expect(createHttpHandler).toBeTypeOf('function')
expect(MemoryRoomManager).toBeTypeOf('function')
expect(RedisRoomManager).toBeTypeOf('function')
expect(RoomManager).toBeTypeOf('function')
expect(authenticateSocket).toBeTypeOf('function')
expect(verifyWorkflowAccess).toBeTypeOf('function')
expect(getWorkflowState).toBeTypeOf('function')
expect(WorkflowOperationSchema).toBeDefined()
})
it.concurrent('should maintain all original functionality after refactoring', async () => {
it.concurrent('should maintain all original functionality after refactoring', () => {
expect(httpServer).toBeDefined()
expect(io).toBeDefined()
expect(roomManager).toBeDefined()
expect(typeof roomManager.addUserToRoom).toBe('function')
expect(typeof roomManager.removeUserFromRoom).toBe('function')
expect(typeof roomManager.createWorkflowRoom).toBe('function')
expect(typeof roomManager.cleanupUserFromRoom).toBe('function')
expect(typeof roomManager.handleWorkflowDeletion).toBe('function')
expect(typeof roomManager.broadcastPresenceUpdate).toBe('function')
expect(typeof roomManager.validateWorkflowConsistency).toBe('function')
})
})
@@ -334,7 +286,6 @@ describe('Socket Server Index Integration', () => {
it('should have shutdown capability', () => {
expect(typeof httpServer.close).toBe('function')
expect(typeof io.close).toBe('function')
expect(typeof roomManager.shutdown).toBe('function')
})
})

View File

@@ -1,125 +1,112 @@
import { createServer } from 'http'
import { createLogger } from '@sim/logger'
import type { Server as SocketIOServer } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { createSocketIOServer, shutdownSocketIOAdapter } from '@/socket/config/socket'
import { createSocketIOServer } from '@/socket/config/socket'
import { setupAllHandlers } from '@/socket/handlers'
import { type AuthenticatedSocket, authenticateSocket } from '@/socket/middleware/auth'
import { type IRoomManager, MemoryRoomManager, RedisRoomManager } from '@/socket/rooms'
import { RoomManager } from '@/socket/rooms/manager'
import { createHttpHandler } from '@/socket/routes/http'
const logger = createLogger('CollaborativeSocketServer')
/** Maximum time to wait for graceful shutdown before forcing exit */
const SHUTDOWN_TIMEOUT_MS = 10000
// Enhanced server configuration - HTTP server will be configured with handler after all dependencies are set up
const httpServer = createServer()
async function createRoomManager(io: SocketIOServer): Promise<IRoomManager> {
if (env.REDIS_URL) {
logger.info('Initializing Redis-backed RoomManager for multi-pod support')
const manager = new RedisRoomManager(io, env.REDIS_URL)
await manager.initialize()
return manager
}
const io = createSocketIOServer(httpServer)
logger.warn('No REDIS_URL configured - using in-memory RoomManager (single-pod only)')
const manager = new MemoryRoomManager(io)
await manager.initialize()
return manager
}
// Initialize room manager after io is created
const roomManager = new RoomManager(io)
async function main() {
const httpServer = createServer()
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
io.use(authenticateSocket)
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
hasRedis: !!env.REDIS_URL,
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
// Don't exit in production, just log
})
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
})
httpServer.on('error', (error) => {
logger.error('HTTP server error:', error)
})
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
})
})
// Create Socket.IO server with Redis adapter if configured
const io = await createSocketIOServer(httpServer)
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
// Initialize room manager (Redis or in-memory based on config)
const roomManager = await createRoomManager(io)
setupAllHandlers(socket, roomManager)
})
// Set up authentication middleware
io.use(authenticateSocket)
// Set up HTTP handler for health checks and internal APIs
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
// Global error handlers
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
httpServer.on('request', (req, res) => {
logger.info(`🌐 HTTP Request: ${req.method} ${req.url}`, {
method: req.method,
url: req.url,
userAgent: req.headers['user-agent'],
origin: req.headers.origin,
host: req.headers.host,
timestamp: new Date().toISOString(),
})
})
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
io.engine.on('connection_error', (err) => {
logger.error('❌ Engine.IO Connection error:', {
code: err.code,
message: err.message,
context: err.context,
req: err.req
? {
url: err.req.url,
method: err.req.method,
headers: err.req.headers,
}
: 'No request object',
})
})
httpServer.on('error', (error: NodeJS.ErrnoException) => {
logger.error('HTTP server error:', error)
if (error.code === 'EADDRINUSE' || error.code === 'EACCES') {
process.exit(1)
}
})
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
})
})
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
})
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
setupAllHandlers(socket, roomManager)
})
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`🏥 Health check available at: http://localhost:${PORT}/health`)
})
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`Health check available at: http://localhost:${PORT}/health`)
})
const shutdown = async () => {
logger.info('Shutting down Socket.IO server...')
try {
await roomManager.shutdown()
logger.info('RoomManager shutdown complete')
} catch (error) {
logger.error('Error during RoomManager shutdown:', error)
}
try {
await shutdownSocketIOAdapter()
} catch (error) {
logger.error('Error during Socket.IO adapter shutdown:', error)
}
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
setTimeout(() => {
logger.error('Forced shutdown after timeout')
process.exit(1)
}, SHUTDOWN_TIMEOUT_MS)
}
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)
}
// Start the server
main().catch((error) => {
logger.error('Failed to start server:', error)
httpServer.on('error', (error) => {
logger.error('❌ Server failed to start:', error)
process.exit(1)
})
process.on('SIGINT', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})
process.on('SIGTERM', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})

View File

@@ -21,7 +21,7 @@ export interface AuthenticatedSocket extends Socket {
* Socket.IO authentication middleware.
* Handles both anonymous mode (DISABLE_AUTH=true) and normal token-based auth.
*/
export async function authenticateSocket(socket: AuthenticatedSocket, next: (err?: Error) => void) {
export async function authenticateSocket(socket: AuthenticatedSocket, next: any) {
try {
if (isAuthDisabled) {
socket.userId = ANONYMOUS_USER_ID

View File

@@ -73,7 +73,7 @@ export function checkRolePermission(
return { allowed: true }
}
async function verifyWorkspaceMembership(
export async function verifyWorkspaceMembership(
userId: string,
workspaceId: string
): Promise<string | null> {

View File

@@ -1,3 +0,0 @@
export { MemoryRoomManager } from '@/socket/rooms/memory-manager'
export { RedisRoomManager } from '@/socket/rooms/redis-manager'
export type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'

View File

@@ -0,0 +1,291 @@
import * as schema from '@sim/db/schema'
import { workflowBlocks, workflowEdges } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import type { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
const connectionString = env.DATABASE_URL
const db = drizzle(
postgres(connectionString, {
prepare: false,
idle_timeout: 15,
connect_timeout: 20,
max: 3,
onnotice: () => {},
}),
{ schema }
)
const logger = createLogger('RoomManager')
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence> // socketId -> UserPresence
lastModified: number
activeConnections: number
}
export class RoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<
string,
{ userId: string; userName: string; avatarUrl?: string | null }
>()
private io: Server
constructor(io: Server) {
this.io = io
}
createWorkflowRoom(workflowId: string): WorkflowRoom {
return {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
}
}
cleanupUserFromRoom(socketId: string, workflowId: string) {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
}
handleWorkflowDeletion(workflowId: string) {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
socketsToDisconnect.forEach((socketId) => {
const socket = this.io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
this.cleanupUserFromRoom(socketId, workflowId)
})
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
handleWorkflowRevert(workflowId: string, timestamp: number) {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
handleWorkflowUpdate(workflowId: string) {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
// Notify all clients in the workflow room that the workflow has been updated
// This will trigger them to refresh their local state
this.io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
handleCopilotWorkflowEdit(workflowId: string, description?: string) {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
// Emit special event for copilot edits that tells clients to rehydrate from database
this.io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
async validateWorkflowConsistency(
workflowId: string
): Promise<{ valid: boolean; issues: string[] }> {
try {
const issues: string[] = []
const orphanedEdges = await db
.select({
id: workflowEdges.id,
sourceBlockId: workflowEdges.sourceBlockId,
targetBlockId: workflowEdges.targetBlockId,
})
.from(workflowEdges)
.leftJoin(workflowBlocks, eq(workflowEdges.sourceBlockId, workflowBlocks.id))
.where(and(eq(workflowEdges.workflowId, workflowId), isNull(workflowBlocks.id)))
if (orphanedEdges.length > 0) {
issues.push(`Found ${orphanedEdges.length} orphaned edges with missing source blocks`)
}
return { valid: issues.length === 0, issues }
} catch (error) {
logger.error('Error validating workflow consistency:', error)
return { valid: false, issues: ['Consistency check failed'] }
}
}
getWorkflowRooms(): ReadonlyMap<string, WorkflowRoom> {
return this.workflowRooms
}
getSocketToWorkflow(): ReadonlyMap<string, string> {
return this.socketToWorkflow
}
getUserSessions(): ReadonlyMap<string, { userId: string; userName: string }> {
return this.userSessions
}
hasWorkflowRoom(workflowId: string): boolean {
return this.workflowRooms.has(workflowId)
}
getWorkflowRoom(workflowId: string): WorkflowRoom | undefined {
return this.workflowRooms.get(workflowId)
}
setWorkflowRoom(workflowId: string, room: WorkflowRoom): void {
this.workflowRooms.set(workflowId, room)
}
getWorkflowIdForSocket(socketId: string): string | undefined {
return this.socketToWorkflow.get(socketId)
}
setWorkflowForSocket(socketId: string, workflowId: string): void {
this.socketToWorkflow.set(socketId, workflowId)
}
getUserSession(
socketId: string
): { userId: string; userName: string; avatarUrl?: string | null } | undefined {
return this.userSessions.get(socketId)
}
setUserSession(
socketId: string,
session: { userId: string; userName: string; avatarUrl?: string | null }
): void {
this.userSessions.set(socketId, session)
}
getTotalActiveConnections(): number {
return Array.from(this.workflowRooms.values()).reduce(
(total, room) => total + room.activeConnections,
0
)
}
broadcastPresenceUpdate(workflowId: string): void {
const room = this.workflowRooms.get(workflowId)
if (room) {
const roomPresence = Array.from(room.users.values())
this.io.to(workflowId).emit('presence-update', roomPresence)
}
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this.io.to(workflowId).emit(event, payload)
}
/**
* Get the number of unique users in a workflow room
* (not the number of socket connections)
*/
getUniqueUserCount(workflowId: string): number {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
}

View File

@@ -1,260 +0,0 @@
import { createLogger } from '@sim/logger'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'
const logger = createLogger('MemoryRoomManager')
/**
* In-memory room manager for single-pod deployments
* Used as fallback when REDIS_URL is not configured
*/
export class MemoryRoomManager implements IRoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<string, UserSession>()
private _io: Server
constructor(io: Server) {
this._io = io
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}
async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()
this.userSessions.clear()
logger.info('MemoryRoomManager shutdown complete')
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
// Create room if it doesn't exist
if (!this.workflowRooms.has(workflowId)) {
this.workflowRooms.set(workflowId, {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
}
const room = this.workflowRooms.get(workflowId)!
room.users.set(socketId, presence)
room.activeConnections++
room.lastModified = Date.now()
// Map socket to workflow
this.socketToWorkflow.set(socketId, workflowId)
// Store session
this.userSessions.set(socketId, {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl,
})
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
}
async removeUserFromRoom(socketId: string): Promise<string | null> {
const workflowId = this.socketToWorkflow.get(socketId)
if (!workflowId) {
return null
}
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
// Clean up empty rooms
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
return workflowId
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.socketToWorkflow.get(socketId) ?? null
}
async getUserSession(socketId: string): Promise<UserSession | null> {
return this.userSessions.get(socketId) ?? null
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
const room = this.workflowRooms.get(workflowId)
if (!room) return []
return Array.from(room.users.values())
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
return this.workflowRooms.has(workflowId)
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (!room) return
const presence = room.users.get(socketId)
if (presence) {
if (updates.cursor !== undefined) presence.cursor = updates.cursor
if (updates.selection !== undefined) presence.selection = updates.selection
presence.lastActivity = updates.lastActivity ?? Date.now()
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.lastModified = Date.now()
}
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
async getTotalActiveConnections(): Promise<number> {
let total = 0
for (const room of this.workflowRooms.values()) {
total += room.activeConnections
}
return total
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
for (const socketId of socketsToDisconnect) {
const socket = this._io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
await this.removeUserFromRoom(socketId)
}
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -1,434 +0,0 @@
import { createLogger } from '@sim/logger'
import { createClient, type RedisClientType } from 'redis'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession } from '@/socket/rooms/types'
const logger = createLogger('RedisRoomManager')
const KEYS = {
workflowUsers: (wfId: string) => `workflow:${wfId}:users`,
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
socketSession: (socketId: string) => `socket:${socketId}:session`,
} as const
const SOCKET_KEY_TTL = 3600
/**
* Lua script for atomic user removal from room.
* Returns workflowId if user was removed, null otherwise.
* Handles room cleanup atomically to prevent race conditions.
*/
const REMOVE_USER_SCRIPT = `
local socketWorkflowKey = KEYS[1]
local socketSessionKey = KEYS[2]
local workflowUsersPrefix = ARGV[1]
local workflowMetaPrefix = ARGV[2]
local socketId = ARGV[3]
local workflowId = redis.call('GET', socketWorkflowKey)
if not workflowId then
return nil
end
local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'
redis.call('HDEL', workflowUsersKey, socketId)
redis.call('DEL', socketWorkflowKey, socketSessionKey)
local remaining = redis.call('HLEN', workflowUsersKey)
if remaining == 0 then
redis.call('DEL', workflowUsersKey, workflowMetaKey)
end
return workflowId
`
/**
* Lua script for atomic user activity update.
* Performs read-modify-write atomically to prevent lost updates.
* Also refreshes TTL on socket keys to prevent expiry during long sessions.
*/
const UPDATE_ACTIVITY_SCRIPT = `
local workflowUsersKey = KEYS[1]
local socketWorkflowKey = KEYS[2]
local socketSessionKey = KEYS[3]
local socketId = ARGV[1]
local cursorJson = ARGV[2]
local selectionJson = ARGV[3]
local lastActivity = ARGV[4]
local ttl = tonumber(ARGV[5])
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
if not existingJson then
return 0
end
local existing = cjson.decode(existingJson)
if cursorJson ~= '' then
existing.cursor = cjson.decode(cursorJson)
end
if selectionJson ~= '' then
existing.selection = cjson.decode(selectionJson)
end
existing.lastActivity = tonumber(lastActivity)
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
redis.call('EXPIRE', socketWorkflowKey, ttl)
redis.call('EXPIRE', socketSessionKey, ttl)
return 1
`
/**
* Redis-backed room manager for multi-pod deployments.
* Uses Lua scripts for atomic operations to prevent race conditions.
*/
export class RedisRoomManager implements IRoomManager {
private redis: RedisClientType
private _io: Server
private isConnected = false
private removeUserScriptSha: string | null = null
private updateActivityScriptSha: string | null = null
constructor(io: Server, redisUrl: string) {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})
this.redis.on('error', (err) => {
logger.error('Redis client error:', err)
})
this.redis.on('reconnecting', () => {
logger.warn('Redis client reconnecting...')
this.isConnected = false
})
this.redis.on('ready', () => {
logger.info('Redis client ready')
this.isConnected = true
})
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
if (this.isConnected) return
try {
await this.redis.connect()
this.isConnected = true
// Pre-load Lua scripts for better performance
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
logger.info('RedisRoomManager connected to Redis and scripts loaded')
} catch (error) {
logger.error('Failed to connect to Redis:', error)
throw error
}
}
async shutdown(): Promise<void> {
if (!this.isConnected) return
try {
await this.redis.quit()
this.isConnected = false
logger.info('RedisRoomManager disconnected from Redis')
} catch (error) {
logger.error('Error during Redis shutdown:', error)
}
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
try {
const pipeline = this.redis.multi()
pipeline.hSet(KEYS.workflowUsers(workflowId), socketId, JSON.stringify(presence))
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
pipeline.hSet(KEYS.socketSession(socketId), {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl || '',
})
pipeline.expire(KEYS.socketSession(socketId), SOCKET_KEY_TTL)
const results = await pipeline.exec()
// Check if any command failed
const failed = results.some((result) => result instanceof Error)
if (failed) {
logger.error(`Pipeline partially failed when adding user to room`, { workflowId, socketId })
throw new Error('Failed to store user session data in Redis')
}
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
} catch (error) {
logger.error(`Failed to add user to room: ${socketId} -> ${workflowId}`, error)
throw error
}
}
async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
if (!this.removeUserScriptSha) {
logger.error('removeUserFromRoom called before initialize()')
return null
}
try {
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
arguments: ['workflow:', 'workflow:', socketId],
})
if (workflowId) {
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
}
return workflowId as string | null
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
return this.removeUserFromRoom(socketId, true)
}
logger.error(`Failed to remove user from room: ${socketId}`, error)
return null
}
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.redis.get(KEYS.socketWorkflow(socketId))
}
async getUserSession(socketId: string): Promise<UserSession | null> {
try {
const session = await this.redis.hGetAll(KEYS.socketSession(socketId))
if (!session.userId) {
return null
}
return {
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl || undefined,
}
} catch (error) {
logger.error(`Failed to get user session for ${socketId}:`, error)
return null
}
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
try {
const users = await this.redis.hGetAll(KEYS.workflowUsers(workflowId))
return Object.entries(users)
.map(([socketId, json]) => {
try {
return JSON.parse(json) as UserPresence
} catch {
logger.warn(`Corrupted user data for socket ${socketId}, skipping`)
return null
}
})
.filter((u): u is UserPresence => u !== null)
} catch (error) {
logger.error(`Failed to get workflow users for ${workflowId}:`, error)
return []
}
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
const exists = await this.redis.exists(KEYS.workflowUsers(workflowId))
return exists > 0
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>,
retried = false
): Promise<void> {
if (!this.updateActivityScriptSha) {
logger.error('updateUserActivity called before initialize()')
return
}
try {
await this.redis.evalSha(this.updateActivityScriptSha, {
keys: [
KEYS.workflowUsers(workflowId),
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
],
arguments: [
socketId,
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
(updates.lastActivity ?? Date.now()).toString(),
SOCKET_KEY_TTL.toString(),
],
})
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
return this.updateUserActivity(workflowId, socketId, updates, true)
}
logger.error(`Failed to update user activity: ${socketId}`, error)
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
// io.to() with Redis adapter broadcasts to all pods
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const users = await this.getWorkflowUsers(workflowId)
const uniqueUserIds = new Set(users.map((u) => u.userId))
return uniqueUserIds.size
}
async getTotalActiveConnections(): Promise<number> {
// This is more complex with Redis - we'd need to scan all workflow:*:users keys
// For now, just count sockets in this server instance
// The true count would require aggregating across all pods
return this._io.sockets.sockets.size
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
try {
const users = await this.getWorkflowUsers(workflowId)
if (users.length === 0) {
logger.debug(`No active users found for deleted workflow ${workflowId}`)
return
}
// Notify all clients across all pods via Redis adapter
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
// Use Socket.IO's cross-pod socketsLeave() to remove all sockets from the room
// This works across all pods when using the Redis adapter
await this._io.in(workflowId).socketsLeave(workflowId)
logger.debug(`All sockets left workflow room ${workflowId} via socketsLeave()`)
// Remove all users from Redis state
for (const user of users) {
await this.removeUserFromRoom(user.socketId)
}
// Clean up room data
await this.redis.del([KEYS.workflowUsers(workflowId), KEYS.workflowMeta(workflowId)])
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${users.length} users disconnected)`
)
} catch (error) {
logger.error(`Failed to handle workflow deletion for ${workflowId}:`, error)
}
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -1,140 +0,0 @@
import type { Server } from 'socket.io'
/**
* User presence data stored in room state
*/
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
tabSessionId?: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
/**
* User session data (minimal info for quick lookups)
*/
export interface UserSession {
userId: string
userName: string
avatarUrl?: string | null
}
/**
* Workflow room state
*/
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence>
lastModified: number
activeConnections: number
}
/**
* Common interface for room managers (in-memory and Redis)
* All methods that access state are async to support Redis operations
*/
export interface IRoomManager {
readonly io: Server
/**
* Initialize the room manager (connect to Redis, etc.)
*/
initialize(): Promise<void>
/**
* Clean shutdown
*/
shutdown(): Promise<void>
/**
* Add a user to a workflow room
*/
addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void>
/**
* Remove a user from their current room
* Returns the workflowId they were in, or null if not in any room
*/
removeUserFromRoom(socketId: string): Promise<string | null>
/**
* Get the workflow ID for a socket
*/
getWorkflowIdForSocket(socketId: string): Promise<string | null>
/**
* Get user session data for a socket
*/
getUserSession(socketId: string): Promise<UserSession | null>
/**
* Get all users in a workflow room
*/
getWorkflowUsers(workflowId: string): Promise<UserPresence[]>
/**
* Check if a workflow room exists
*/
hasWorkflowRoom(workflowId: string): Promise<boolean>
/**
* Update user activity (cursor, selection, lastActivity)
*/
updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void>
/**
* Update room's lastModified timestamp
*/
updateRoomLastModified(workflowId: string): Promise<void>
/**
* Broadcast presence update to all clients in a workflow room
*/
broadcastPresenceUpdate(workflowId: string): Promise<void>
/**
* Emit an event to all clients in a workflow room
*/
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void
/**
* Get the number of unique users in a workflow room
*/
getUniqueUserCount(workflowId: string): Promise<number>
/**
* Get total active connections across all rooms
*/
getTotalActiveConnections(): Promise<number>
/**
* Handle workflow deletion - notify users and clean up room
*/
handleWorkflowDeletion(workflowId: string): Promise<void>
/**
* Handle workflow revert - notify users
*/
handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void>
/**
* Handle workflow update - notify users
*/
handleWorkflowUpdate(workflowId: string): Promise<void>
/**
* Handle copilot workflow edit - notify users to rehydrate
*/
handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void>
}

View File

@@ -1,52 +1,11 @@
import type { IncomingMessage, ServerResponse } from 'http'
import { env } from '@/lib/core/config/env'
import type { IRoomManager } from '@/socket/rooms'
import type { RoomManager } from '@/socket/rooms/manager'
interface Logger {
info: (message: string, ...args: unknown[]) => void
error: (message: string, ...args: unknown[]) => void
debug: (message: string, ...args: unknown[]) => void
warn: (message: string, ...args: unknown[]) => void
}
function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?: string } {
const apiKey = req.headers['x-api-key']
const expectedApiKey = env.INTERNAL_API_SECRET
if (!expectedApiKey) {
return { success: false, error: 'Internal API key not configured' }
}
if (!apiKey) {
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
return { success: false, error: 'Invalid API key' }
}
return { success: true }
}
function readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => resolve(body))
req.on('error', reject)
})
}
function sendSuccess(res: ServerResponse): void {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
}
function sendError(res: ServerResponse, message: string, status = 500): void {
res.writeHead(status, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: message }))
info: (message: string, ...args: any[]) => void
error: (message: string, ...args: any[]) => void
debug: (message: string, ...args: any[]) => void
warn: (message: string, ...args: any[]) => void
}
/**
@@ -55,91 +14,101 @@ function sendError(res: ServerResponse, message: string, status = 500): void {
* @param logger - Logger instance for logging requests and errors
* @returns HTTP request handler function
*/
export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return async (req: IncomingMessage, res: ServerResponse) => {
// Health check doesn't require auth
export function createHttpHandler(roomManager: RoomManager, logger: Logger) {
return (req: IncomingMessage, res: ServerResponse) => {
if (req.method === 'GET' && req.url === '/health') {
try {
const connections = await roomManager.getTotalActiveConnections()
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections,
})
)
} catch (error) {
logger.error('Error in health check:', error)
res.writeHead(503, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ status: 'error', message: 'Health check failed' }))
}
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections: roomManager.getTotalActiveConnections(),
})
)
return
}
// All POST endpoints require internal API key authentication
if (req.method === 'POST') {
const authResult = checkInternalApiKey(req)
if (!authResult.success) {
res.writeHead(401, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: authResult.error }))
return
}
}
// Handle workflow deletion notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-deleted') {
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowDeletion(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
sendError(res, 'Failed to process deletion notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowDeletion(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process deletion notification' }))
}
})
return
}
// Handle workflow update notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-updated') {
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowUpdate(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow update notification:', error)
sendError(res, 'Failed to process update notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowUpdate(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow update notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process update notification' }))
}
})
return
}
// Handle copilot workflow edit notifications from the main API
if (req.method === 'POST' && req.url === '/api/copilot-workflow-edit') {
try {
const body = await readRequestBody(req)
const { workflowId, description } = JSON.parse(body)
await roomManager.handleCopilotWorkflowEdit(workflowId, description)
sendSuccess(res)
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
sendError(res, 'Failed to process copilot edit notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, description } = JSON.parse(body)
roomManager.handleCopilotWorkflowEdit(workflowId, description)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process copilot edit notification' }))
}
})
return
}
// Handle workflow revert notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
try {
const body = await readRequestBody(req)
const { workflowId, timestamp } = JSON.parse(body)
await roomManager.handleWorkflowRevert(workflowId, timestamp)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
sendError(res, 'Failed to process revert notification')
}
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, timestamp } = JSON.parse(body)
roomManager.handleWorkflowRevert(workflowId, timestamp)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process revert notification' }))
}
})
return
}

View File

@@ -239,3 +239,5 @@ export const WorkflowOperationSchema = z.union([
VariableOperationSchema,
WorkflowStateOperationSchema,
])
export { PositionSchema, AutoConnectEdgeSchema }

View File

@@ -4,19 +4,6 @@ import type { OperationQueueState, QueuedOperation } from './types'
const logger = createLogger('OperationQueue')
/** Timeout for subblock/variable operations before considering them failed */
const SUBBLOCK_VARIABLE_TIMEOUT_MS = 15000
/** Timeout for structural operations before considering them failed */
const STRUCTURAL_TIMEOUT_MS = 5000
/** Maximum retry attempts for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRIES = 5
/** Maximum retry attempts for structural operations */
const STRUCTURAL_MAX_RETRIES = 3
/** Maximum retry delay cap for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS = 3000
/** Base retry delay multiplier (1s, 2s, 3s for linear) */
const RETRY_DELAY_BASE_MS = 1000
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
@@ -213,14 +200,14 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
(operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable')
const maxRetries = isSubblockOrVariable ? SUBBLOCK_VARIABLE_MAX_RETRIES : STRUCTURAL_MAX_RETRIES
const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural
if (operation.retryCount < maxRetries) {
const newRetryCount = operation.retryCount + 1
// Faster retries for subblock/variable, exponential for structural
const delay = isSubblockOrVariable
? Math.min(RETRY_DELAY_BASE_MS * newRetryCount, SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS)
: 2 ** newRetryCount * RETRY_DELAY_BASE_MS
? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s)
: 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural)
logger.warn(
`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`,
@@ -322,9 +309,7 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
nextOperation.operation.target === 'subblock') ||
(nextOperation.operation.operation === 'variable-update' &&
nextOperation.operation.target === 'variable')
const timeoutDuration = isSubblockOrVariable
? SUBBLOCK_VARIABLE_TIMEOUT_MS
: STRUCTURAL_TIMEOUT_MS
const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops
const timeoutId = setTimeout(() => {
logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {

View File

@@ -104,7 +104,6 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -175,7 +174,6 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",
@@ -1148,16 +1146,6 @@
"@reactflow/node-toolbar": ["@reactflow/node-toolbar@1.3.14", "", { "dependencies": { "@reactflow/core": "11.11.4", "classcat": "^5.0.3", "zustand": "^4.4.1" }, "peerDependencies": { "react": ">=17", "react-dom": ">=17" } }, "sha512-rbynXQnH/xFNu4P9H+hVqlEUafDCkEoCy0Dg9mG22Sg+rY/0ck6KkrAQrYrTgXusd+cEJOMK0uOOFCK2/5rSGQ=="],
"@redis/bloom": ["@redis/bloom@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-doIF37ob+l47n0rkpRNgU8n4iacBlKM9xLiP1LtTZTvz8TloJB8qx/MgvhMhKdYG+CvCY2aPBnN2706izFn/4A=="],
"@redis/client": ["@redis/client@5.10.0", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-JXmM4XCoso6C75Mr3lhKA3eNxSzkYi3nCzxDIKY+YOszYsJjuKbFgVtguVPbLMOttN4iu2fXoc2BGhdnYhIOxA=="],
"@redis/json": ["@redis/json@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-B2G8XlOmTPUuZtD44EMGbtoepQG34RCDXLZbjrtON1Djet0t5Ri7/YPXvL9aomXqP8lLTreaprtyLKF4tmXEEA=="],
"@redis/search": ["@redis/search@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-3SVcPswoSfp2HnmWbAGUzlbUPn7fOohVu2weUQ0S+EMiQi8jwjL+aN2p6V3TI65eNfVsJ8vyPvqWklm6H6esmg=="],
"@redis/time-series": ["@redis/time-series@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-cPkpddXH5kc/SdRhF0YG0qtjL+noqFT0AcHbQ6axhsPsO7iqPi1cjxgdkE9TNeKiBUUdCaU1DbqkR/LzbzPBhg=="],
"@resvg/resvg-wasm": ["@resvg/resvg-wasm@2.4.0", "", {}, "sha512-C7c51Nn4yTxXFKvgh2txJFNweaVcfUPQxwEUFw4aWsCmfiBDJsTSwviIF8EcwjQ6k8bPyMWCl1vw4BdxE569Cg=="],
"@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-beta.27", "", {}, "sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA=="],
@@ -1352,8 +1340,6 @@
"@socket.io/component-emitter": ["@socket.io/component-emitter@3.1.2", "", {}, "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA=="],
"@socket.io/redis-adapter": ["@socket.io/redis-adapter@8.3.0", "", { "dependencies": { "debug": "~4.3.1", "notepack.io": "~3.0.1", "uid2": "1.0.0" }, "peerDependencies": { "socket.io-adapter": "^2.5.4" } }, "sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA=="],
"@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="],
"@standard-schema/utils": ["@standard-schema/utils@0.3.0", "", {}, "sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g=="],
@@ -2816,8 +2802,6 @@
"normalize-range": ["normalize-range@0.1.2", "", {}, "sha512-bdok/XvKII3nUpklnV6P2hxtMNrCboOjAcyBuQnWEhO665FwrSNRxU+AqpsyvO6LgGYPspN+lu5CLtw4jPRKNA=="],
"notepack.io": ["notepack.io@3.0.1", "", {}, "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg=="],
"npm-run-path": ["npm-run-path@5.3.0", "", { "dependencies": { "path-key": "^4.0.0" } }, "sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ=="],
"npm-to-yarn": ["npm-to-yarn@3.0.1", "", {}, "sha512-tt6PvKu4WyzPwWUzy/hvPFqn+uwXO0K1ZHka8az3NnrhWJDmSqI8ncWq0fkL0k/lmmi5tAC11FXwXuh0rFbt1A=="],
@@ -3088,8 +3072,6 @@
"redent": ["redent@3.0.0", "", { "dependencies": { "indent-string": "^4.0.0", "strip-indent": "^3.0.0" } }, "sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg=="],
"redis": ["redis@5.10.0", "", { "dependencies": { "@redis/bloom": "5.10.0", "@redis/client": "5.10.0", "@redis/json": "5.10.0", "@redis/search": "5.10.0", "@redis/time-series": "5.10.0" } }, "sha512-0/Y+7IEiTgVGPrLFKy8oAEArSyEJkU0zvgV5xyi9NzNQ+SLZmyFbUsWIbgPcd4UdUh00opXGKlXJwMmsis5Byw=="],
"redis-errors": ["redis-errors@1.2.0", "", {}, "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w=="],
"redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="],
@@ -3452,8 +3434,6 @@
"ufo": ["ufo@1.6.3", "", {}, "sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q=="],
"uid2": ["uid2@1.0.0", "", {}, "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ=="],
"ulid": ["ulid@2.4.0", "", { "bin": { "ulid": "bin/cli.js" } }, "sha512-fIRiVTJNcSRmXKPZtGzFQv9WRrZ3M9eoptl/teFJvjOzmpU+/K/JH6HZ8deBfb5vMEpicJcLn7JmvdknlMq7Zg=="],
"unbzip2-stream": ["unbzip2-stream@1.4.3", "", { "dependencies": { "buffer": "^5.2.1", "through": "^2.3.8" } }, "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg=="],
@@ -3872,8 +3852,6 @@
"@shuding/opentype.js/fflate": ["fflate@0.7.4", "", {}, "sha512-5u2V/CDW15QM1XbbgS+0DfPxVB+jUKhWEKuuFuHncbk3tEEqzmoXL+2KyOFuKGqOnmdIy0/davWF1CkuwtibCw=="],
"@socket.io/redis-adapter/debug": ["debug@4.3.7", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ=="],
"@tailwindcss/node/jiti": ["jiti@2.6.1", "", { "bin": { "jiti": "lib/jiti-cli.mjs" } }, "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ=="],
"@tailwindcss/oxide-wasm32-wasi/@emnapi/core": ["@emnapi/core@1.8.1", "", { "dependencies": { "@emnapi/wasi-threads": "1.1.0", "tslib": "^2.4.0" }, "bundled": true }, "sha512-AvT9QFpxK0Zd8J0jopedNm+w/2fIzvtPKPjqyw9jwvBaReTTqPBk9Hixaz7KbjimP+QNz605/XnjFcDAL2pqBg=="],

View File

@@ -44,10 +44,6 @@ spec:
env:
- name: DATABASE_URL
value: {{ include "sim.databaseUrl" . | quote }}
{{- if .Values.app.env.REDIS_URL }}
- name: REDIS_URL
value: {{ .Values.app.env.REDIS_URL | quote }}
{{- end }}
{{- range $key, $value := .Values.realtime.env }}
- name: {{ $key }}
value: {{ $value | quote }}