Compare commits

...

20 Commits

Author SHA1 Message Date
Vikhyath Mondreti
efe2b85346 fix bugbot comments 2026-01-29 20:15:34 -08:00
Vikhyath Mondreti
49a6197cd2 cleanup code 2026-01-29 19:56:36 -08:00
Vikhyath Mondreti
175b72899c fix 2026-01-29 19:28:53 -08:00
Vikhyath Mondreti
427e3b9417 null cursor 2026-01-29 19:16:37 -08:00
Vikhyath Mondreti
2b248104e6 fix more bugbot cleanup comments 2026-01-29 19:01:32 -08:00
Vikhyath Mondreti
df1a951e98 bugbot comment 2026-01-29 17:45:24 -08:00
Vikhyath Mondreti
1fbe3029f4 fix bugbot comments 2026-01-29 17:30:19 -08:00
Vikhyath Mondreti
e5d9b98909 use native api 2026-01-29 17:08:44 -08:00
Vikhyath Mondreti
d00ed958cc improve typing 2026-01-29 17:00:33 -08:00
Vikhyath Mondreti
7d23e2363d remove random error code 2026-01-29 16:56:56 -08:00
Vikhyath Mondreti
fc1ca1e36b improvment(sockets): migrate to redis 2026-01-29 16:50:38 -08:00
Siddharth Ganesan
2b026ded16 fix(copilot): hosted api key validation + credential validation (#3000)
* Fix

* Fix greptile

* Fix validation

* Fix comments

* Lint

* Fix

* remove passed in workspace id ref

* Fix comments

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-29 10:48:59 -08:00
Siddharth Ganesan
dca0758054 fix(executor): conditional deactivation for loops/parallels (#3069)
* Fix deactivation

* Remove comments
2026-01-29 10:43:30 -08:00
Waleed
ae17c90bdf chore(readme): update readme.md (#3066) 2026-01-28 23:51:34 -08:00
Waleed
1256a15266 fix(posthog): move session recording proxy to middleware for large payload support (#3065)
Next.js rewrites can strip request bodies for large payloads (1MB+),
causing 400 errors from CloudFront. PostHog session recordings require
up to 64MB per message. Moving the proxy to middleware ensures proper
body passthrough.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 23:49:57 -08:00
Waleed
0b2b7ed9c8 fix(oauth): use createElement for icon components to fix React hooks error (#3064) 2026-01-28 23:37:00 -08:00
Vikhyath Mondreti
0d8d9fb238 fix(type): logs workspace delivery (#3063) 2026-01-28 21:54:20 -08:00
Vikhyath Mondreti
e0f1e66f4f feat(child-workflows): nested execution snapshots (#3059)
* feat(child-workflows): nested execution snapshots

* cleanup typing

* address bugbot comments and fix tests

* do not cascade delete logs/snapshots

* fix few more inconsitencies

* fix external logs route

* add fallback color
2026-01-28 19:40:52 -08:00
Emir Karabeg
20bb7cdec6 improvement(preview): include current workflow badge in breadcrumb in workflow snapshot (#3062)
* feat(preview): add workflow context badge for nested navigation

Adds a badge next to the Back button when viewing nested workflows
to help users identify which workflow they are currently viewing.
This is especially helpful when navigating deeply into nested
workflow blocks.

Changes:
- Added workflowName field to WorkflowStackEntry interface
- Capture workflow name from metadata when drilling down
- Display workflow name badge next to Back button

Co-authored-by: emir <emir@simstudio.ai>

* added workflow name and desc to metadata for workflow preview

* added copy and search icon in code in preview editor

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: waleed <walif6@gmail.com>
2026-01-28 19:33:19 -08:00
Waleed
1469e9c66c feat(youtube): add captions, trending, and video categories tools with enhanced API coverage (#3060)
* feat(youtube): add captions, trending, and video categories tools with enhanced API coverage

* fix(youtube): remove captions tool (requires OAuth), fix tinybird defaults, encode pageToken
2026-01-28 19:08:33 -08:00
86 changed files with 14153 additions and 1370 deletions

View File

@@ -172,31 +172,6 @@ Key environment variables for self-hosted deployments. See [`.env.example`](apps
| `API_ENCRYPTION_KEY` | Yes | Encrypts API keys (`openssl rand -hex 32`) |
| `COPILOT_API_KEY` | No | API key from sim.ai for Copilot features |
## Troubleshooting
### Ollama models not showing in dropdown (Docker)
If you're running Ollama on your host machine and Sim in Docker, change `OLLAMA_URL` from `localhost` to `host.docker.internal`:
```bash
OLLAMA_URL=http://host.docker.internal:11434 docker compose -f docker-compose.prod.yml up -d
```
See [Using an External Ollama Instance](#using-an-external-ollama-instance) for details.
### Database connection issues
Ensure PostgreSQL has the pgvector extension installed. When using Docker, wait for the database to be healthy before running migrations.
### Port conflicts
If ports 3000, 3002, or 5432 are in use, configure alternatives:
```bash
# Custom ports
NEXT_PUBLIC_APP_URL=http://localhost:3100 POSTGRES_PORT=5433 docker compose up -d
```
## Tech Stack
- **Framework**: [Next.js](https://nextjs.org/) (App Router)

View File

@@ -26,78 +26,41 @@ In Sim, the YouTube integration enables your agents to programmatically search a
## Usage Instructions
Integrate YouTube into the workflow. Can search for videos, get video details, get channel information, get all videos from a channel, get channel playlists, get playlist items, find related videos, and get video comments.
Integrate YouTube into the workflow. Can search for videos, get trending videos, get video details, get video captions, get video categories, get channel information, get all videos from a channel, get channel playlists, get playlist items, and get video comments.
## Tools
### `youtube_search`
### `youtube_captions`
Search for videos on YouTube using the YouTube Data API. Supports advanced filtering by channel, date range, duration, category, quality, captions, and more.
List available caption tracks (subtitles/transcripts) for a YouTube video. Returns information about each caption including language, type, and whether it is auto-generated.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `query` | string | Yes | Search query for YouTube videos |
| `maxResults` | number | No | Maximum number of videos to return \(1-50\) |
| `apiKey` | string | Yes | YouTube API Key |
| `channelId` | string | No | Filter results to a specific YouTube channel ID |
| `publishedAfter` | string | No | Only return videos published after this date \(RFC 3339 format: "2024-01-01T00:00:00Z"\) |
| `publishedBefore` | string | No | Only return videos published before this date \(RFC 3339 format: "2024-01-01T00:00:00Z"\) |
| `videoDuration` | string | No | Filter by video length: "short" \(&lt;4 min\), "medium" \(4-20 min\), "long" \(&gt;20 min\), "any" |
| `order` | string | No | Sort results by: "date", "rating", "relevance" \(default\), "title", "videoCount", "viewCount" |
| `videoCategoryId` | string | No | Filter by YouTube category ID \(e.g., "10" for Music, "20" for Gaming\) |
| `videoDefinition` | string | No | Filter by video quality: "high" \(HD\), "standard", "any" |
| `videoCaption` | string | No | Filter by caption availability: "closedCaption" \(has captions\), "none" \(no captions\), "any" |
| `regionCode` | string | No | Return results relevant to a specific region \(ISO 3166-1 alpha-2 country code, e.g., "US", "GB"\) |
| `relevanceLanguage` | string | No | Return results most relevant to a language \(ISO 639-1 code, e.g., "en", "es"\) |
| `safeSearch` | string | No | Content filtering level: "moderate" \(default\), "none", "strict" |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of YouTube videos matching the search query |
| ↳ `videoId` | string | YouTube video ID |
| ↳ `title` | string | Video title |
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| `totalResults` | number | Total number of search results available |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_video_details`
Get detailed information about a specific YouTube video.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `videoId` | string | Yes | YouTube video ID |
| `videoId` | string | Yes | YouTube video ID to get captions for |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `videoId` | string | YouTube video ID |
| `title` | string | Video title |
| `description` | string | Video description |
| `channelId` | string | Channel ID |
| `channelTitle` | string | Channel name |
| `publishedAt` | string | Published date and time |
| `duration` | string | Video duration in ISO 8601 format |
| `viewCount` | number | Number of views |
| `likeCount` | number | Number of likes |
| `commentCount` | number | Number of comments |
| `thumbnail` | string | Video thumbnail URL |
| `tags` | array | Video tags |
| `items` | array | Array of available caption tracks for the video |
| ↳ `captionId` | string | Caption track ID |
| ↳ `language` | string | Language code of the caption \(e.g., |
| ↳ `name` | string | Name/label of the caption track |
| ↳ `trackKind` | string | Type of caption track: |
| ↳ `lastUpdated` | string | When the caption was last updated |
| ↳ `isCC` | boolean | Whether this is a closed caption track |
| ↳ `isAutoSynced` | boolean | Whether the caption timing was automatically synced |
| ↳ `audioTrackType` | string | Type of audio track this caption is for |
| `totalResults` | number | Total number of caption tracks available |
### `youtube_channel_info`
Get detailed information about a YouTube channel.
Get detailed information about a YouTube channel including statistics, branding, and content details.
#### Input
@@ -114,43 +77,20 @@ Get detailed information about a YouTube channel.
| `channelId` | string | YouTube channel ID |
| `title` | string | Channel name |
| `description` | string | Channel description |
| `subscriberCount` | number | Number of subscribers |
| `videoCount` | number | Number of videos |
| `subscriberCount` | number | Number of subscribers \(0 if hidden\) |
| `videoCount` | number | Number of public videos |
| `viewCount` | number | Total channel views |
| `publishedAt` | string | Channel creation date |
| `thumbnail` | string | Channel thumbnail URL |
| `customUrl` | string | Channel custom URL |
### `youtube_channel_videos`
Get all videos from a specific YouTube channel, with sorting options.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `channelId` | string | Yes | YouTube channel ID to get videos from |
| `maxResults` | number | No | Maximum number of videos to return \(1-50\) |
| `order` | string | No | Sort order: "date" \(newest first\), "rating", "relevance", "title", "viewCount" |
| `pageToken` | string | No | Page token for pagination |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of videos from the channel |
| ↳ `videoId` | string | YouTube video ID |
| ↳ `title` | string | Video title |
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| ↳ `publishedAt` | string | Video publish date |
| `totalResults` | number | Total number of videos in the channel |
| `nextPageToken` | string | Token for accessing the next page of results |
| `thumbnail` | string | Channel thumbnail/avatar URL |
| `customUrl` | string | Channel custom URL \(handle\) |
| `country` | string | Country the channel is associated with |
| `uploadsPlaylistId` | string | Playlist ID containing all channel uploads \(use with playlist_items\) |
| `bannerImageUrl` | string | Channel banner image URL |
| `hiddenSubscriberCount` | boolean | Whether the subscriber count is hidden |
### `youtube_channel_playlists`
Get all playlists from a specific YouTube channel.
Get all public playlists from a specific YouTube channel.
#### Input
@@ -172,19 +112,80 @@ Get all playlists from a specific YouTube channel.
| ↳ `thumbnail` | string | Playlist thumbnail URL |
| ↳ `itemCount` | number | Number of videos in playlist |
| ↳ `publishedAt` | string | Playlist creation date |
| ↳ `channelTitle` | string | Channel name |
| `totalResults` | number | Total number of playlists in the channel |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_playlist_items`
### `youtube_channel_videos`
Get videos from a YouTube playlist.
Search for videos from a specific YouTube channel with sorting options. For complete channel video list, use channel_info to get uploadsPlaylistId, then use playlist_items.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `playlistId` | string | Yes | YouTube playlist ID |
| `maxResults` | number | No | Maximum number of videos to return |
| `channelId` | string | Yes | YouTube channel ID to get videos from |
| `maxResults` | number | No | Maximum number of videos to return \(1-50\) |
| `order` | string | No | Sort order: "date" \(newest first, default\), "rating", "relevance", "title", "viewCount" |
| `pageToken` | string | No | Page token for pagination |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of videos from the channel |
| ↳ `videoId` | string | YouTube video ID |
| ↳ `title` | string | Video title |
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| ↳ `publishedAt` | string | Video publish date |
| ↳ `channelTitle` | string | Channel name |
| `totalResults` | number | Total number of videos in the channel |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_comments`
Get top-level comments from a YouTube video with author details and engagement.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `videoId` | string | Yes | YouTube video ID |
| `maxResults` | number | No | Maximum number of comments to return \(1-100\) |
| `order` | string | No | Order of comments: "time" \(newest first\) or "relevance" \(most relevant first\) |
| `pageToken` | string | No | Page token for pagination |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of top-level comments from the video |
| ↳ `commentId` | string | Comment ID |
| ↳ `authorDisplayName` | string | Comment author display name |
| ↳ `authorChannelUrl` | string | Comment author channel URL |
| ↳ `authorProfileImageUrl` | string | Comment author profile image URL |
| ↳ `textDisplay` | string | Comment text \(HTML formatted\) |
| ↳ `textOriginal` | string | Comment text \(plain text\) |
| ↳ `likeCount` | number | Number of likes on the comment |
| ↳ `publishedAt` | string | When the comment was posted |
| ↳ `updatedAt` | string | When the comment was last edited |
| ↳ `replyCount` | number | Number of replies to this comment |
| `totalResults` | number | Total number of comment threads available |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_playlist_items`
Get videos from a YouTube playlist. Can be used with a channel uploads playlist to get all channel videos.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `playlistId` | string | Yes | YouTube playlist ID. Use uploadsPlaylistId from channel_info to get all channel videos. |
| `maxResults` | number | No | Maximum number of videos to return \(1-50\) |
| `pageToken` | string | No | Page token for pagination |
| `apiKey` | string | Yes | YouTube API Key |
@@ -198,22 +199,65 @@ Get videos from a YouTube playlist.
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| ↳ `publishedAt` | string | Date added to playlist |
| ↳ `channelTitle` | string | Channel name |
| ↳ `position` | number | Position in playlist |
| ↳ `channelTitle` | string | Playlist owner channel name |
| ↳ `position` | number | Position in playlist \(0-indexed\) |
| ↳ `videoOwnerChannelId` | string | Channel ID of the video owner |
| ↳ `videoOwnerChannelTitle` | string | Channel name of the video owner |
| `totalResults` | number | Total number of items in playlist |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_comments`
### `youtube_search`
Get comments from a YouTube video.
Search for videos on YouTube using the YouTube Data API. Supports advanced filtering by channel, date range, duration, category, quality, captions, live streams, and more.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `videoId` | string | Yes | YouTube video ID |
| `maxResults` | number | No | Maximum number of comments to return |
| `order` | string | No | Order of comments: time or relevance |
| `query` | string | Yes | Search query for YouTube videos |
| `maxResults` | number | No | Maximum number of videos to return \(1-50\) |
| `pageToken` | string | No | Page token for pagination \(use nextPageToken from previous response\) |
| `apiKey` | string | Yes | YouTube API Key |
| `channelId` | string | No | Filter results to a specific YouTube channel ID |
| `publishedAfter` | string | No | Only return videos published after this date \(RFC 3339 format: "2024-01-01T00:00:00Z"\) |
| `publishedBefore` | string | No | Only return videos published before this date \(RFC 3339 format: "2024-01-01T00:00:00Z"\) |
| `videoDuration` | string | No | Filter by video length: "short" \(&lt;4 min\), "medium" \(4-20 min\), "long" \(&gt;20 min\), "any" |
| `order` | string | No | Sort results by: "date", "rating", "relevance" \(default\), "title", "videoCount", "viewCount" |
| `videoCategoryId` | string | No | Filter by YouTube category ID \(e.g., "10" for Music, "20" for Gaming\). Use video_categories to list IDs. |
| `videoDefinition` | string | No | Filter by video quality: "high" \(HD\), "standard", "any" |
| `videoCaption` | string | No | Filter by caption availability: "closedCaption" \(has captions\), "none" \(no captions\), "any" |
| `eventType` | string | No | Filter by live broadcast status: "live" \(currently live\), "upcoming" \(scheduled\), "completed" \(past streams\) |
| `regionCode` | string | No | Return results relevant to a specific region \(ISO 3166-1 alpha-2 country code, e.g., "US", "GB"\) |
| `relevanceLanguage` | string | No | Return results most relevant to a language \(ISO 639-1 code, e.g., "en", "es"\) |
| `safeSearch` | string | No | Content filtering level: "moderate" \(default\), "none", "strict" |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of YouTube videos matching the search query |
| ↳ `videoId` | string | YouTube video ID |
| ↳ `title` | string | Video title |
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| ↳ `channelId` | string | Channel ID that uploaded the video |
| ↳ `channelTitle` | string | Channel name |
| ↳ `publishedAt` | string | Video publish date |
| ↳ `liveBroadcastContent` | string | Live broadcast status: |
| `totalResults` | number | Total number of search results available |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_trending`
Get the most popular/trending videos on YouTube. Can filter by region and video category.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `regionCode` | string | No | ISO 3166-1 alpha-2 country code to get trending videos for \(e.g., "US", "GB", "JP"\). Defaults to US. |
| `videoCategoryId` | string | No | Filter by video category ID \(e.g., "10" for Music, "20" for Gaming, "17" for Sports\) |
| `maxResults` | number | No | Maximum number of trending videos to return \(1-50\) |
| `pageToken` | string | No | Page token for pagination |
| `apiKey` | string | Yes | YouTube API Key |
@@ -221,17 +265,84 @@ Get comments from a YouTube video.
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of comments from the video |
| ↳ `commentId` | string | Comment ID |
| ↳ `authorDisplayName` | string | Comment author name |
| ↳ `authorChannelUrl` | string | Comment author channel URL |
| ↳ `textDisplay` | string | Comment text \(HTML formatted\) |
| ↳ `textOriginal` | string | Comment text \(plain text\) |
| `items` | array | Array of trending videos |
| ↳ `videoId` | string | YouTube video ID |
| ↳ `title` | string | Video title |
| ↳ `description` | string | Video description |
| ↳ `thumbnail` | string | Video thumbnail URL |
| ↳ `channelId` | string | Channel ID |
| ↳ `channelTitle` | string | Channel name |
| ↳ `publishedAt` | string | Video publish date |
| ↳ `viewCount` | number | Number of views |
| ↳ `likeCount` | number | Number of likes |
| ↳ `publishedAt` | string | Comment publish date |
| ↳ `updatedAt` | string | Comment last updated date |
| ↳ `replyCount` | number | Number of replies |
| `totalResults` | number | Total number of comments |
| ↳ `commentCount` | number | Number of comments |
| ↳ `duration` | string | Video duration in ISO 8601 format |
| `totalResults` | number | Total number of trending videos available |
| `nextPageToken` | string | Token for accessing the next page of results |
### `youtube_video_categories`
Get a list of video categories available on YouTube. Use this to discover valid category IDs for filtering search and trending results.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `regionCode` | string | No | ISO 3166-1 alpha-2 country code to get categories for \(e.g., "US", "GB", "JP"\). Defaults to US. |
| `hl` | string | No | Language for category titles \(e.g., "en", "es", "fr"\). Defaults to English. |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `items` | array | Array of video categories available in the specified region |
| ↳ `categoryId` | string | Category ID to use in search/trending filters \(e.g., |
| ↳ `title` | string | Human-readable category name |
| ↳ `assignable` | boolean | Whether videos can be tagged with this category |
| `totalResults` | number | Total number of categories available |
### `youtube_video_details`
Get detailed information about a specific YouTube video including statistics, content details, live streaming info, and metadata.
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `videoId` | string | Yes | YouTube video ID |
| `apiKey` | string | Yes | YouTube API Key |
#### Output
| Parameter | Type | Description |
| --------- | ---- | ----------- |
| `videoId` | string | YouTube video ID |
| `title` | string | Video title |
| `description` | string | Video description |
| `channelId` | string | Channel ID |
| `channelTitle` | string | Channel name |
| `publishedAt` | string | Published date and time |
| `duration` | string | Video duration in ISO 8601 format \(e.g., |
| `viewCount` | number | Number of views |
| `likeCount` | number | Number of likes |
| `commentCount` | number | Number of comments |
| `favoriteCount` | number | Number of times added to favorites |
| `thumbnail` | string | Video thumbnail URL |
| `tags` | array | Video tags |
| `categoryId` | string | YouTube video category ID |
| `definition` | string | Video definition: |
| `caption` | string | Whether captions are available: |
| `licensedContent` | boolean | Whether the video is licensed content |
| `privacyStatus` | string | Video privacy status: |
| `liveBroadcastContent` | string | Live broadcast status: |
| `defaultLanguage` | string | Default language of the video metadata |
| `defaultAudioLanguage` | string | Default audio language of the video |
| `isLiveContent` | boolean | Whether this video is or was a live stream |
| `scheduledStartTime` | string | Scheduled start time for upcoming live streams \(ISO 8601\) |
| `actualStartTime` | string | When the live stream actually started \(ISO 8601\) |
| `actualEndTime` | string | When the live stream ended \(ISO 8601\) |
| `concurrentViewers` | number | Current number of viewers \(only for active live streams\) |
| `activeLiveChatId` | string | Live chat ID for the stream \(only for active live streams\) |

View File

@@ -4,22 +4,22 @@ import { auth } from '@/lib/auth'
import { isAuthDisabled } from '@/lib/core/config/feature-flags'
export async function POST() {
try {
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
if (isAuthDisabled) {
return NextResponse.json({ token: 'anonymous-socket-token' })
}
try {
const hdrs = await headers()
const response = await auth.api.generateOneTimeToken({
headers: hdrs,
})
if (!response) {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
if (!response?.token) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
}
return NextResponse.json({ token: response.token })
} catch (error) {
} catch {
return NextResponse.json({ error: 'Failed to generate token' }, { status: 500 })
}
}

View File

@@ -56,7 +56,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
deploymentVersionName: workflowDeploymentVersion.name,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
@@ -65,7 +65,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
@@ -77,17 +77,19 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Not found' }, { status: 404 })
}
const workflowSummary = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
const workflowSummary = log.workflowId
? {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
: null
const response = {
id: log.id,

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { subscription, user, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { subscription, user, workflowExecutionLogs, workspace } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
@@ -40,17 +40,17 @@ export async function GET(request: NextRequest) {
const freeUserIds = freeUsers.map((u) => u.userId)
const workflowsQuery = await db
.select({ id: workflow.id })
.from(workflow)
.where(inArray(workflow.userId, freeUserIds))
const workspacesQuery = await db
.select({ id: workspace.id })
.from(workspace)
.where(inArray(workspace.billedAccountUserId, freeUserIds))
if (workflowsQuery.length === 0) {
logger.info('No workflows found for free users')
return NextResponse.json({ message: 'No workflows found for cleanup' })
if (workspacesQuery.length === 0) {
logger.info('No workspaces found for free users')
return NextResponse.json({ message: 'No workspaces found for cleanup' })
}
const workflowIds = workflowsQuery.map((w) => w.id)
const workspaceIds = workspacesQuery.map((w) => w.id)
const results = {
enhancedLogs: {
@@ -77,7 +77,7 @@ export async function GET(request: NextRequest) {
let batchesProcessed = 0
let hasMoreLogs = true
logger.info(`Starting enhanced logs cleanup for ${workflowIds.length} workflows`)
logger.info(`Starting enhanced logs cleanup for ${workspaceIds.length} workspaces`)
while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
const oldEnhancedLogs = await db
@@ -99,7 +99,7 @@ export async function GET(request: NextRequest) {
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workflowId, workflowIds),
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
lt(workflowExecutionLogs.createdAt, retentionDate)
)
)
@@ -127,7 +127,7 @@ export async function GET(request: NextRequest) {
customKey: enhancedLogKey,
metadata: {
logId: String(log.id),
workflowId: String(log.workflowId),
workflowId: String(log.workflowId ?? ''),
executionId: String(log.executionId),
logType: 'enhanced',
archivedAt: new Date().toISOString(),

View File

@@ -6,10 +6,11 @@ import {
workflowExecutionSnapshots,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
const logger = createLogger('LogsByExecutionIdAPI')
@@ -48,14 +49,15 @@ export async function GET(
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
executionData: workflowExecutionLogs.executionData,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, authenticatedUserId)
)
)
@@ -78,10 +80,42 @@ export async function GET(
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}
const executionData = workflowLog.executionData as WorkflowExecutionLog['executionData']
const traceSpans = (executionData?.traceSpans as TraceSpan[]) || []
const childSnapshotIds = new Set<string>()
const collectSnapshotIds = (spans: TraceSpan[]) => {
spans.forEach((span) => {
const snapshotId = span.childWorkflowSnapshotId
if (typeof snapshotId === 'string') {
childSnapshotIds.add(snapshotId)
}
if (span.children?.length) {
collectSnapshotIds(span.children)
}
})
}
if (traceSpans.length > 0) {
collectSnapshotIds(traceSpans)
}
const childWorkflowSnapshots =
childSnapshotIds.size > 0
? await db
.select()
.from(workflowExecutionSnapshots)
.where(inArray(workflowExecutionSnapshots.id, Array.from(childSnapshotIds)))
: []
const childSnapshotMap = childWorkflowSnapshots.reduce<Record<string, unknown>>((acc, snap) => {
acc[snap.id] = snap.stateData
return acc
}, {})
const response = {
executionId,
workflowId: workflowLog.workflowId,
workflowState: snapshot.stateData,
childWorkflowSnapshots: childSnapshotMap,
executionMetadata: {
trigger: workflowLog.trigger,
startedAt: workflowLog.startedAt.toISOString(),

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { and, desc, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
@@ -41,7 +41,7 @@ export async function GET(request: NextRequest) {
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
executionData: workflowExecutionLogs.executionData,
workflowName: workflow.name,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
}
const workspaceCondition = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
@@ -74,7 +74,7 @@ export async function GET(request: NextRequest) {
const rows = await db
.select(selectColumns)
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(

View File

@@ -116,7 +116,7 @@ export async function GET(request: NextRequest) {
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -190,7 +190,7 @@ export async function GET(request: NextRequest) {
pausedExecutions,
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -314,17 +314,19 @@ export async function GET(request: NextRequest) {
} catch {}
}
const workflowSummary = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
const workflowSummary = log.workflowId
? {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
: null
return {
id: log.id,

View File

@@ -72,7 +72,7 @@ export async function GET(request: NextRequest) {
maxTime: sql<string>`MAX(${workflowExecutionLogs.startedAt})`,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -103,8 +103,8 @@ export async function GET(request: NextRequest) {
const statsQuery = await db
.select({
workflowId: workflowExecutionLogs.workflowId,
workflowName: workflow.name,
workflowId: sql<string>`COALESCE(${workflowExecutionLogs.workflowId}, 'deleted')`,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
segmentIndex:
sql<number>`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTimeIso}::timestamp)) * 1000 / ${segmentMs})`.as(
'segment_index'
@@ -120,7 +120,7 @@ export async function GET(request: NextRequest) {
),
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -130,7 +130,11 @@ export async function GET(request: NextRequest) {
)
)
.where(whereCondition)
.groupBy(workflowExecutionLogs.workflowId, workflow.name, sql`segment_index`)
.groupBy(
sql`COALESCE(${workflowExecutionLogs.workflowId}, 'deleted')`,
sql`COALESCE(${workflow.name}, 'Deleted Workflow')`,
sql`segment_index`
)
const workflowMap = new Map<
string,

View File

@@ -97,7 +97,10 @@ export async function POST(
const socketServerUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-reverted`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId: id, timestamp: Date.now() }),
})
} catch (e) {

View File

@@ -133,9 +133,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
const finalWorkflowData = {
...workflowData,
state: {
// Default values for expected properties
deploymentStatuses: {},
// Data from normalized tables
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
@@ -143,8 +141,11 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
lastSaved: Date.now(),
isDeployed: workflowData.isDeployed || false,
deployedAt: workflowData.deployedAt,
metadata: {
name: workflowData.name,
description: workflowData.description,
},
},
// Include workflow variables
variables: workflowData.variables || {},
}
@@ -166,6 +167,10 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
lastSaved: Date.now(),
isDeployed: workflowData.isDeployed || false,
deployedAt: workflowData.deployedAt,
metadata: {
name: workflowData.name,
description: workflowData.description,
},
},
variables: workflowData.variables || {},
}
@@ -356,7 +361,10 @@ export async function DELETE(
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const socketResponse = await fetch(`${socketUrl}/api/workflow-deleted`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
})

View File

@@ -254,7 +254,10 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
})

View File

@@ -215,6 +215,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
}
for (const log of logs) {
if (!log.workflowId) continue // Skip logs for deleted workflows
const idx = Math.min(
segments - 1,
Math.max(0, Math.floor((log.startedAt.getTime() - start.getTime()) / segmentMs))

View File

@@ -1,5 +1,9 @@
import { memo } from 'react'
import { cn } from '@/lib/core/utils/cn'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
} from '@/app/workspace/[workspaceId]/logs/utils'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { StatusBar, type StatusBarSegment } from '..'
@@ -61,22 +65,32 @@ export function WorkflowsList({
<div>
{filteredExecutions.map((workflow, idx) => {
const isSelected = expandedWorkflowId === workflow.workflowId
const isDeletedWorkflow = workflow.workflowName === DELETED_WORKFLOW_LABEL
const workflowColor = isDeletedWorkflow
? DELETED_WORKFLOW_COLOR
: workflows[workflow.workflowId]?.color || '#64748b'
const canToggle = !isDeletedWorkflow
return (
<div
key={workflow.workflowId}
className={cn(
'flex h-[44px] cursor-pointer items-center gap-[16px] px-[24px] hover:bg-[var(--surface-3)] dark:hover:bg-[var(--surface-4)]',
'flex h-[44px] items-center gap-[16px] px-[24px] hover:bg-[var(--surface-3)] dark:hover:bg-[var(--surface-4)]',
canToggle ? 'cursor-pointer' : 'cursor-default',
isSelected && 'bg-[var(--surface-3)] dark:bg-[var(--surface-4)]'
)}
onClick={() => onToggleWorkflow(workflow.workflowId)}
onClick={() => {
if (canToggle) {
onToggleWorkflow(workflow.workflowId)
}
}}
>
{/* Workflow name with color */}
<div className='flex w-[160px] flex-shrink-0 items-center gap-[8px] pr-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{
backgroundColor: workflows[workflow.workflowId]?.color || '#64748b',
backgroundColor: workflowColor,
}}
/>
<span className='min-w-0 truncate font-medium text-[12px] text-[var(--text-primary)]'>

View File

@@ -80,6 +80,9 @@ export function ExecutionSnapshot({
}, [executionId, closeMenu])
const workflowState = data?.workflowState as WorkflowState | undefined
const childWorkflowSnapshots = data?.childWorkflowSnapshots as
| Record<string, WorkflowState>
| undefined
const renderContent = () => {
if (isLoading) {
@@ -148,6 +151,7 @@ export function ExecutionSnapshot({
key={executionId}
workflowState={workflowState}
traceSpans={traceSpans}
childWorkflowSnapshots={childWorkflowSnapshots}
className={className}
height={height}
width={width}

View File

@@ -26,6 +26,8 @@ import {
} from '@/app/workspace/[workspaceId]/logs/components'
import { useLogDetailsResize } from '@/app/workspace/[workspaceId]/logs/hooks'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
formatDate,
getDisplayStatus,
StatusBadge,
@@ -386,22 +388,25 @@ export const LogDetails = memo(function LogDetails({
</div>
{/* Workflow Card */}
{log.workflow && (
<div className='flex w-0 min-w-0 flex-1 flex-col gap-[8px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Workflow
</div>
<div className='flex min-w-0 items-center gap-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{ backgroundColor: log.workflow?.color }}
/>
<span className='min-w-0 flex-1 truncate font-medium text-[14px] text-[var(--text-secondary)]'>
{log.workflow.name}
</span>
</div>
<div className='flex w-0 min-w-0 flex-1 flex-col gap-[8px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Workflow
</div>
)}
<div className='flex min-w-0 items-center gap-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{
backgroundColor:
log.workflow?.color ||
(!log.workflowId ? DELETED_WORKFLOW_COLOR : undefined),
}}
/>
<span className='min-w-0 flex-1 truncate font-medium text-[14px] text-[var(--text-secondary)]'>
{log.workflow?.name ||
(!log.workflowId ? DELETED_WORKFLOW_LABEL : 'Unknown')}
</span>
</div>
</div>
</div>
{/* Execution ID */}

View File

@@ -7,6 +7,8 @@ import { List, type RowComponentProps, useListRef } from 'react-window'
import { Badge, buttonVariants } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
formatDate,
formatDuration,
getDisplayStatus,
@@ -33,6 +35,11 @@ interface LogRowProps {
const LogRow = memo(
function LogRow({ log, isSelected, onClick, onContextMenu, selectedRowRef }: LogRowProps) {
const formattedDate = useMemo(() => formatDate(log.createdAt), [log.createdAt])
const isDeletedWorkflow = !log.workflow?.id && !log.workflowId
const workflowName = isDeletedWorkflow
? DELETED_WORKFLOW_LABEL
: log.workflow?.name || 'Unknown'
const workflowColor = isDeletedWorkflow ? DELETED_WORKFLOW_COLOR : log.workflow?.color
const handleClick = useCallback(() => onClick(log), [onClick, log])
@@ -78,10 +85,15 @@ const LogRow = memo(
>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{ backgroundColor: log.workflow?.color }}
style={{ backgroundColor: workflowColor }}
/>
<span className='min-w-0 truncate font-medium text-[12px] text-[var(--text-primary)]'>
{log.workflow?.name || 'Unknown'}
<span
className={cn(
'min-w-0 truncate font-medium text-[12px]',
isDeletedWorkflow ? 'text-[var(--text-tertiary)]' : 'text-[var(--text-primary)]'
)}
>
{workflowName}
</span>
</div>

View File

@@ -27,6 +27,9 @@ export const LOG_COLUMN_ORDER: readonly LogColumnKey[] = [
'duration',
] as const
export const DELETED_WORKFLOW_LABEL = 'Deleted Workflow'
export const DELETED_WORKFLOW_COLOR = 'var(--text-tertiary)'
export type LogStatus = 'error' | 'pending' | 'running' | 'info' | 'cancelled'
/**

View File

@@ -1,6 +1,6 @@
'use client'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { createElement, useCallback, useEffect, useMemo, useState } from 'react'
import { createLogger } from '@sim/logger'
import { ExternalLink, Users } from 'lucide-react'
import { Button, Combobox } from '@/components/emcn/components'
@@ -203,7 +203,7 @@ export function CredentialSelector({
if (!baseProviderConfig) {
return <ExternalLink className='h-3 w-3' />
}
return baseProviderConfig.icon({ className: 'h-3 w-3' })
return createElement(baseProviderConfig.icon, { className: 'h-3 w-3' })
}, [])
const getProviderName = useCallback((providerName: OAuthProvider) => {

View File

@@ -23,6 +23,7 @@ interface SelectorComboboxProps {
readOnly?: boolean
onOptionChange?: (value: string) => void
allowSearch?: boolean
missingOptionLabel?: string
}
export function SelectorCombobox({
@@ -37,6 +38,7 @@ export function SelectorCombobox({
readOnly,
onOptionChange,
allowSearch = true,
missingOptionLabel,
}: SelectorComboboxProps) {
const [storeValueRaw, setStoreValue] = useSubBlockValue<string | null | undefined>(
blockId,
@@ -60,7 +62,16 @@ export function SelectorCombobox({
detailId: activeValue,
})
const optionMap = useSelectorOptionMap(options, detailOption ?? undefined)
const selectedLabel = activeValue ? (optionMap.get(activeValue)?.label ?? activeValue) : ''
const hasMissingOption =
Boolean(activeValue) &&
Boolean(missingOptionLabel) &&
!isLoading &&
!optionMap.get(activeValue!)
const selectedLabel = activeValue
? hasMissingOption
? missingOptionLabel
: (optionMap.get(activeValue)?.label ?? activeValue)
: ''
const [inputValue, setInputValue] = useState(selectedLabel)
const previousActiveValue = useRef<string | undefined>(activeValue)

View File

@@ -1,4 +1,4 @@
import { useCallback, useEffect, useMemo, useState } from 'react'
import { createElement, useCallback, useEffect, useMemo, useState } from 'react'
import { ExternalLink } from 'lucide-react'
import { Button, Combobox } from '@/components/emcn/components'
import {
@@ -22,7 +22,7 @@ const getProviderIcon = (providerName: OAuthProvider) => {
if (!baseProviderConfig) {
return <ExternalLink className='h-3 w-3' />
}
return baseProviderConfig.icon({ className: 'h-3 w-3' })
return createElement(baseProviderConfig.icon, { className: 'h-3 w-3' })
}
const getProviderName = (providerName: OAuthProvider) => {

View File

@@ -1,6 +1,7 @@
'use client'
import { useMemo } from 'react'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { SelectorCombobox } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/selector-combobox/selector-combobox'
import type { SubBlockConfig } from '@/blocks/types'
import type { SelectorContext } from '@/hooks/selectors/types'
@@ -40,6 +41,7 @@ export function WorkflowSelectorInput({
isPreview={isPreview}
previewValue={previewValue}
placeholder={subBlock.placeholder || 'Select workflow...'}
missingOptionLabel={DELETED_WORKFLOW_LABEL}
/>
)
}

View File

@@ -4,11 +4,14 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import {
ArrowDown,
ArrowUp,
Check,
ChevronDown as ChevronDownIcon,
ChevronUp,
Clipboard,
ExternalLink,
Maximize2,
RepeatIcon,
Search,
SplitIcon,
X,
} from 'lucide-react'
@@ -34,6 +37,7 @@ import {
isSubBlockFeatureEnabled,
isSubBlockVisibleForMode,
} from '@/lib/workflows/subblocks/visibility'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { SubBlock } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components'
import { PreviewContextMenu } from '@/app/workspace/[workspaceId]/w/components/preview/components/preview-context-menu'
import { PreviewWorkflow } from '@/app/workspace/[workspaceId]/w/components/preview/components/preview-workflow'
@@ -690,6 +694,7 @@ interface ExecutionData {
output?: unknown
status?: string
durationMs?: number
childWorkflowSnapshotId?: string
}
interface WorkflowVariable {
@@ -714,6 +719,8 @@ interface PreviewEditorProps {
parallels?: Record<string, Parallel>
/** When true, shows "Not Executed" badge if no executionData is provided */
isExecutionMode?: boolean
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
childWorkflowSnapshots?: Record<string, WorkflowState>
/** Optional close handler - if not provided, no close button is shown */
onClose?: () => void
/** Callback to drill down into a nested workflow block */
@@ -739,6 +746,7 @@ function PreviewEditorContent({
loops,
parallels,
isExecutionMode = false,
childWorkflowSnapshots,
onClose,
onDrillDown,
}: PreviewEditorProps) {
@@ -768,17 +776,35 @@ function PreviewEditorContent({
const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } = useWorkflowState(
childWorkflowId ?? undefined
)
const childWorkflowSnapshotId = executionData?.childWorkflowSnapshotId
const childWorkflowSnapshotState = childWorkflowSnapshotId
? childWorkflowSnapshots?.[childWorkflowSnapshotId]
: undefined
const resolvedChildWorkflowState = isExecutionMode
? childWorkflowSnapshotState
: childWorkflowState
const resolvedIsLoadingChildWorkflow = isExecutionMode ? false : isLoadingChildWorkflow
const isMissingChildWorkflow =
Boolean(childWorkflowId) && !resolvedIsLoadingChildWorkflow && !resolvedChildWorkflowState
/** Drills down into the child workflow or opens it in a new tab */
const handleExpandChildWorkflow = useCallback(() => {
if (!childWorkflowId || !childWorkflowState) return
if (!childWorkflowId) return
if (isExecutionMode && onDrillDown) {
onDrillDown(block.id, childWorkflowState)
if (!childWorkflowSnapshotState) return
onDrillDown(block.id, childWorkflowSnapshotState)
} else if (workspaceId) {
window.open(`/workspace/${workspaceId}/w/${childWorkflowId}`, '_blank', 'noopener,noreferrer')
}
}, [childWorkflowId, childWorkflowState, isExecutionMode, onDrillDown, block.id, workspaceId])
}, [
childWorkflowId,
childWorkflowSnapshotState,
isExecutionMode,
onDrillDown,
block.id,
workspaceId,
])
const contentRef = useRef<HTMLDivElement>(null)
const subBlocksRef = useRef<HTMLDivElement>(null)
@@ -813,6 +839,13 @@ function PreviewEditorContent({
} = useContextMenu()
const [contextMenuData, setContextMenuData] = useState({ content: '', copyOnly: false })
const [copiedSection, setCopiedSection] = useState<'input' | 'output' | null>(null)
const handleCopySection = useCallback((content: string, section: 'input' | 'output') => {
navigator.clipboard.writeText(content)
setCopiedSection(section)
setTimeout(() => setCopiedSection(null), 1500)
}, [])
const openContextMenu = useCallback(
(e: React.MouseEvent, content: string, copyOnly: boolean) => {
@@ -862,9 +895,6 @@ function PreviewEditorContent({
}
}, [contextMenuData.content])
/**
* Handles mouse down event on the resize handle to initiate resizing
*/
const handleConnectionsResizeMouseDown = useCallback(
(e: React.MouseEvent) => {
setIsResizing(true)
@@ -874,18 +904,12 @@ function PreviewEditorContent({
[connectionsHeight]
)
/**
* Toggle connections collapsed state
*/
const toggleConnectionsCollapsed = useCallback(() => {
setConnectionsHeight((prev) =>
prev <= MIN_CONNECTIONS_HEIGHT ? DEFAULT_CONNECTIONS_HEIGHT : MIN_CONNECTIONS_HEIGHT
)
}, [])
/**
* Sets up resize event listeners during resize operations
*/
useEffect(() => {
if (!isResizing) return
@@ -1205,7 +1229,11 @@ function PreviewEditorContent({
}
emptyMessage='No input data'
>
<div onContextMenu={handleExecutionContextMenu} ref={contentRef}>
<div
onContextMenu={handleExecutionContextMenu}
ref={contentRef}
className='relative'
>
<Code.Viewer
code={formatValueAsJson(executionData.input)}
language='json'
@@ -1215,6 +1243,49 @@ function PreviewEditorContent({
currentMatchIndex={currentMatchIndex}
onMatchCountChange={handleMatchCountChange}
/>
{/* Action buttons overlay */}
{!isSearchActive && (
<div className='absolute top-[7px] right-[6px] z-10 flex gap-[4px]'>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
type='button'
variant='ghost'
onClick={(e) => {
e.stopPropagation()
handleCopySection(formatValueAsJson(executionData.input), 'input')
}}
className='h-[20px] w-[20px] cursor-pointer border border-[var(--border-1)] bg-transparent p-0 backdrop-blur-sm hover:bg-[var(--surface-4)]'
>
{copiedSection === 'input' ? (
<Check className='h-[10px] w-[10px] text-[var(--text-success)]' />
) : (
<Clipboard className='h-[10px] w-[10px]' />
)}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{copiedSection === 'input' ? 'Copied' : 'Copy'}
</Tooltip.Content>
</Tooltip.Root>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
type='button'
variant='ghost'
onClick={(e) => {
e.stopPropagation()
activateSearch()
}}
className='h-[20px] w-[20px] cursor-pointer border border-[var(--border-1)] bg-transparent p-0 backdrop-blur-sm hover:bg-[var(--surface-4)]'
>
<Search className='h-[10px] w-[10px]' />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>Search</Tooltip.Content>
</Tooltip.Root>
</div>
)}
</div>
</CollapsibleSection>
)}
@@ -1231,7 +1302,7 @@ function PreviewEditorContent({
emptyMessage='No output data'
isError={executionData.status === 'error'}
>
<div onContextMenu={handleExecutionContextMenu}>
<div onContextMenu={handleExecutionContextMenu} className='relative'>
<Code.Viewer
code={formatValueAsJson(executionData.output)}
language='json'
@@ -1244,6 +1315,49 @@ function PreviewEditorContent({
currentMatchIndex={currentMatchIndex}
onMatchCountChange={handleMatchCountChange}
/>
{/* Action buttons overlay */}
{!isSearchActive && (
<div className='absolute top-[7px] right-[6px] z-10 flex gap-[4px]'>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
type='button'
variant='ghost'
onClick={(e) => {
e.stopPropagation()
handleCopySection(formatValueAsJson(executionData.output), 'output')
}}
className='h-[20px] w-[20px] cursor-pointer border border-[var(--border-1)] bg-transparent p-0 backdrop-blur-sm hover:bg-[var(--surface-4)]'
>
{copiedSection === 'output' ? (
<Check className='h-[10px] w-[10px] text-[var(--text-success)]' />
) : (
<Clipboard className='h-[10px] w-[10px]' />
)}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{copiedSection === 'output' ? 'Copied' : 'Copy'}
</Tooltip.Content>
</Tooltip.Root>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
type='button'
variant='ghost'
onClick={(e) => {
e.stopPropagation()
activateSearch()
}}
className='h-[20px] w-[20px] cursor-pointer border border-[var(--border-1)] bg-transparent p-0 backdrop-blur-sm hover:bg-[var(--surface-4)]'
>
<Search className='h-[10px] w-[10px]' />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>Search</Tooltip.Content>
</Tooltip.Root>
</div>
)}
</div>
</CollapsibleSection>
)}
@@ -1256,7 +1370,7 @@ function PreviewEditorContent({
Workflow Preview
</div>
<div className='relative h-[160px] overflow-hidden rounded-[4px] border border-[var(--border)]'>
{isLoadingChildWorkflow ? (
{resolvedIsLoadingChildWorkflow ? (
<div className='flex h-full items-center justify-center bg-[var(--surface-3)]'>
<div
className='h-[18px] w-[18px] animate-spin rounded-full'
@@ -1269,11 +1383,11 @@ function PreviewEditorContent({
}}
/>
</div>
) : childWorkflowState ? (
) : resolvedChildWorkflowState ? (
<>
<div className='[&_*:active]:!cursor-grabbing [&_*]:!cursor-grab [&_.react-flow__handle]:!hidden h-full w-full'>
<PreviewWorkflow
workflowState={childWorkflowState}
workflowState={resolvedChildWorkflowState}
height={160}
width='100%'
isPannable={true}
@@ -1305,7 +1419,9 @@ function PreviewEditorContent({
) : (
<div className='flex h-full items-center justify-center bg-[var(--surface-3)]'>
<span className='text-[13px] text-[var(--text-tertiary)]'>
Unable to load preview
{isMissingChildWorkflow
? DELETED_WORKFLOW_LABEL
: 'Unable to load preview'}
</span>
</div>
)}

View File

@@ -9,6 +9,7 @@ import {
isSubBlockFeatureEnabled,
isSubBlockVisibleForMode,
} from '@/lib/workflows/subblocks/visibility'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { getDisplayValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block'
import { getBlock } from '@/blocks'
import { SELECTOR_TYPES_HYDRATION_REQUIRED, type SubBlockConfig } from '@/blocks/types'
@@ -112,7 +113,7 @@ function resolveWorkflowName(
if (!rawValue || typeof rawValue !== 'string') return null
const workflowMap = useWorkflowRegistry.getState().workflows
return workflowMap[rawValue]?.name ?? null
return workflowMap[rawValue]?.name ?? DELETED_WORKFLOW_LABEL
}
/**

View File

@@ -19,6 +19,8 @@ interface TraceSpan {
status?: string
duration?: number
children?: TraceSpan[]
childWorkflowSnapshotId?: string
childWorkflowId?: string
}
interface BlockExecutionData {
@@ -28,6 +30,7 @@ interface BlockExecutionData {
durationMs: number
/** Child trace spans for nested workflow blocks */
children?: TraceSpan[]
childWorkflowSnapshotId?: string
}
/** Represents a level in the workflow navigation stack */
@@ -35,6 +38,7 @@ interface WorkflowStackEntry {
workflowState: WorkflowState
traceSpans: TraceSpan[]
blockExecutions: Record<string, BlockExecutionData>
workflowName: string
}
/**
@@ -89,6 +93,7 @@ export function buildBlockExecutions(spans: TraceSpan[]): Record<string, BlockEx
status: span.status || 'unknown',
durationMs: span.duration || 0,
children: span.children,
childWorkflowSnapshotId: span.childWorkflowSnapshotId,
}
}
}
@@ -103,6 +108,8 @@ interface PreviewProps {
traceSpans?: TraceSpan[]
/** Pre-computed block executions (optional - will be built from traceSpans if not provided) */
blockExecutions?: Record<string, BlockExecutionData>
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
childWorkflowSnapshots?: Record<string, WorkflowState>
/** Additional CSS class names */
className?: string
/** Height of the component */
@@ -135,6 +142,7 @@ export function Preview({
workflowState: rootWorkflowState,
traceSpans: rootTraceSpans,
blockExecutions: providedBlockExecutions,
childWorkflowSnapshots,
className,
height = '100%',
width = '100%',
@@ -144,7 +152,6 @@ export function Preview({
initialSelectedBlockId,
autoSelectLeftmost = true,
}: PreviewProps) {
/** Initialize pinnedBlockId synchronously to ensure sidebar is present from first render */
const [pinnedBlockId, setPinnedBlockId] = useState<string | null>(() => {
if (initialSelectedBlockId) return initialSelectedBlockId
if (autoSelectLeftmost) {
@@ -153,17 +160,14 @@ export function Preview({
return null
})
/** Stack for nested workflow navigation. Empty means we're at the root level. */
const [workflowStack, setWorkflowStack] = useState<WorkflowStackEntry[]>([])
/** Block executions for the root level */
const rootBlockExecutions = useMemo(() => {
if (providedBlockExecutions) return providedBlockExecutions
if (!rootTraceSpans || !Array.isArray(rootTraceSpans)) return {}
return buildBlockExecutions(rootTraceSpans)
}, [providedBlockExecutions, rootTraceSpans])
/** Current block executions - either from stack or root */
const blockExecutions = useMemo(() => {
if (workflowStack.length > 0) {
return workflowStack[workflowStack.length - 1].blockExecutions
@@ -171,7 +175,6 @@ export function Preview({
return rootBlockExecutions
}, [workflowStack, rootBlockExecutions])
/** Current workflow state - either from stack or root */
const workflowState = useMemo(() => {
if (workflowStack.length > 0) {
return workflowStack[workflowStack.length - 1].workflowState
@@ -179,41 +182,39 @@ export function Preview({
return rootWorkflowState
}, [workflowStack, rootWorkflowState])
/** Whether we're in execution mode (have trace spans/block executions) */
const isExecutionMode = useMemo(() => {
return Object.keys(blockExecutions).length > 0
}, [blockExecutions])
/** Handler to drill down into a nested workflow block */
const handleDrillDown = useCallback(
(blockId: string, childWorkflowState: WorkflowState) => {
const blockExecution = blockExecutions[blockId]
const childTraceSpans = extractChildTraceSpans(blockExecution)
const childBlockExecutions = buildBlockExecutions(childTraceSpans)
const workflowName = childWorkflowState.metadata?.name || 'Nested Workflow'
setWorkflowStack((prev) => [
...prev,
{
workflowState: childWorkflowState,
traceSpans: childTraceSpans,
blockExecutions: childBlockExecutions,
workflowName,
},
])
/** Set pinned block synchronously to avoid double fitView from sidebar resize */
const leftmostId = getLeftmostBlockId(childWorkflowState)
setPinnedBlockId(leftmostId)
},
[blockExecutions]
)
/** Handler to go back up the stack */
const handleGoBack = useCallback(() => {
setWorkflowStack((prev) => prev.slice(0, -1))
setPinnedBlockId(null)
}, [])
/** Handlers for node interactions - memoized to prevent unnecessary re-renders */
const handleNodeClick = useCallback((blockId: string) => {
setPinnedBlockId(blockId)
}, [])
@@ -232,6 +233,8 @@ export function Preview({
const isNested = workflowStack.length > 0
const currentWorkflowName = isNested ? workflowStack[workflowStack.length - 1].workflowName : null
return (
<div
style={{ height, width }}
@@ -242,20 +245,27 @@ export function Preview({
)}
>
{isNested && (
<div className='absolute top-[12px] left-[12px] z-20'>
<div className='absolute top-[12px] left-[12px] z-20 flex items-center gap-[6px]'>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={handleGoBack}
className='flex h-[30px] items-center gap-[5px] border border-[var(--border)] bg-[var(--surface-2)] px-[10px] hover:bg-[var(--surface-4)]'
className='flex h-[28px] items-center gap-[5px] rounded-[6px] border border-[var(--border)] bg-[var(--surface-2)] px-[10px] text-[var(--text-secondary)] shadow-sm hover:bg-[var(--surface-4)] hover:text-[var(--text-primary)]'
>
<ArrowLeft className='h-[13px] w-[13px]' />
<span className='font-medium text-[13px]'>Back</span>
<ArrowLeft className='h-[12px] w-[12px]' />
<span className='font-medium text-[12px]'>Back</span>
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='bottom'>Go back to parent workflow</Tooltip.Content>
</Tooltip.Root>
{currentWorkflowName && (
<div className='flex h-[28px] max-w-[200px] items-center rounded-[6px] border border-[var(--border)] bg-[var(--surface-2)] px-[10px] shadow-sm'>
<span className='truncate font-medium text-[12px] text-[var(--text-secondary)]'>
{currentWorkflowName}
</span>
</div>
)}
</div>
)}
@@ -284,6 +294,7 @@ export function Preview({
loops={workflowState.loops}
parallels={workflowState.parallels}
isExecutionMode={isExecutionMode}
childWorkflowSnapshots={childWorkflowSnapshots}
onClose={handleEditorClose}
onDrillDown={handleDrillDown}
/>

View File

@@ -1,6 +1,6 @@
'use client'
import { useEffect, useRef, useState } from 'react'
import { createElement, useEffect, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { Check, ChevronDown, ExternalLink, Search } from 'lucide-react'
import { useRouter, useSearchParams } from 'next/navigation'
@@ -339,9 +339,7 @@ export function Integrations({ onOpenChange, registerCloseHandler }: Integration
>
<div className='flex items-center gap-[12px]'>
<div className='flex h-9 w-9 flex-shrink-0 items-center justify-center overflow-hidden rounded-[6px] bg-[var(--surface-5)]'>
{typeof service.icon === 'function'
? service.icon({ className: 'h-4 w-4' })
: service.icon}
{createElement(service.icon, { className: 'h-4 w-4' })}
</div>
<div className='flex flex-col justify-center gap-[1px]'>
<span className='font-medium text-[14px]'>{service.name}</span>

View File

@@ -17,6 +17,19 @@ import { getEnv } from '@/lib/core/config/env'
const logger = createLogger('SocketContext')
const TAB_SESSION_ID_KEY = 'sim_tab_session_id'
function getTabSessionId(): string {
if (typeof window === 'undefined') return ''
let tabSessionId = sessionStorage.getItem(TAB_SESSION_ID_KEY)
if (!tabSessionId) {
tabSessionId = crypto.randomUUID()
sessionStorage.setItem(TAB_SESSION_ID_KEY, tabSessionId)
}
return tabSessionId
}
interface User {
id: string
name?: string
@@ -36,11 +49,13 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
authFailed: boolean
currentWorkflowId: string | null
currentSocketId: string | null
presenceUsers: PresenceUser[]
joinWorkflow: (workflowId: string) => void
leaveWorkflow: () => void
retryConnection: () => void
emitWorkflowOperation: (
operation: string,
target: string,
@@ -63,8 +78,6 @@ interface SocketContextType {
onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
onUserJoined: (handler: (data: any) => void) => void
onUserLeft: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
@@ -75,11 +88,13 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
authFailed: false,
currentWorkflowId: null,
currentSocketId: null,
presenceUsers: [],
joinWorkflow: () => {},
leaveWorkflow: () => {},
retryConnection: () => {},
emitWorkflowOperation: () => {},
emitSubblockUpdate: () => {},
emitVariableUpdate: () => {},
@@ -90,8 +105,6 @@ const SocketContext = createContext<SocketContextType>({
onVariableUpdate: () => {},
onCursorUpdate: () => {},
onSelectionUpdate: () => {},
onUserJoined: () => {},
onUserLeft: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onOperationConfirmed: () => {},
@@ -112,33 +125,43 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [currentSocketId, setCurrentSocketId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
const urlWorkflowIdRef = useRef(urlWorkflowId)
urlWorkflowIdRef.current = urlWorkflowId
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
variableUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
userJoined?: (data: any) => void
userLeft?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const isRejoiningRef = useRef<boolean>(false)
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const generateSocketToken = async (): Promise<string> => {
const res = await fetch('/api/auth/socket-token', {
method: 'POST',
credentials: 'include',
headers: { 'cache-control': 'no-store' },
})
if (!res.ok) throw new Error('Failed to generate socket token')
if (!res.ok) {
if (res.status === 401) {
throw new Error('Authentication required')
}
throw new Error('Failed to generate socket token')
}
const body = await res.json().catch(() => ({}))
const token = body?.token
if (!token || typeof token !== 'string') throw new Error('Invalid socket token')
@@ -148,6 +171,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
useEffect(() => {
if (!user?.id) return
if (authFailed) {
logger.info('Socket initialization skipped - auth failed, waiting for retry')
return
}
if (initializedRef.current || socket || isConnecting) {
logger.info('Socket already exists or is connecting, skipping initialization')
return
@@ -180,7 +208,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
cb({ token: freshToken })
} catch (error) {
logger.error('Failed to generate fresh token for connection:', error)
cb({ token: null })
if (error instanceof Error && error.message === 'Authentication required') {
// True auth failure - pass null token, server will reject with "Authentication required"
cb({ token: null })
}
// For server errors, don't call cb - connection will timeout and Socket.IO will retry
}
},
})
@@ -194,26 +226,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
connected: socketInstance.connected,
transport: socketInstance.io.engine?.transport?.name,
})
if (urlWorkflowId) {
logger.info(`Joining workflow room after connection: ${urlWorkflowId}`)
socketInstance.emit('join-workflow', {
workflowId: urlWorkflowId,
})
setCurrentWorkflowId(urlWorkflowId)
}
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('disconnect', (reason) => {
setIsConnected(false)
setIsConnecting(false)
setCurrentSocketId(null)
setCurrentWorkflowId(null)
setPresenceUsers([])
logger.info('Socket disconnected', {
reason,
})
setPresenceUsers([])
})
socketInstance.on('connect_error', (error: any) => {
@@ -226,24 +251,34 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
transport: error.transport,
})
if (
// Check if this is an authentication failure
const isAuthError =
error.message?.includes('Token validation failed') ||
error.message?.includes('Authentication failed') ||
error.message?.includes('Authentication required')
) {
if (isAuthError) {
logger.warn(
'Authentication failed - this could indicate session expiry or token generation issues'
'Authentication failed - stopping reconnection attempts. User may need to refresh/re-login.'
)
// Stop reconnection attempts to prevent infinite loop
socketInstance.disconnect()
// Reset state to allow re-initialization when session is restored
setSocket(null)
setAuthFailed(true)
initializedRef.current = false
}
})
socketInstance.on('reconnect', (attemptNumber) => {
setIsConnected(true)
setCurrentSocketId(socketInstance.id ?? null)
logger.info('Socket reconnected successfully', {
attemptNumber,
socketId: socketInstance.id,
transport: socketInstance.io.engine?.transport?.name,
})
// Note: join-workflow is handled by the useEffect watching isConnected
})
socketInstance.on('reconnect_attempt', (attemptNumber) => {
@@ -284,6 +319,26 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
// Handle join workflow success - confirms room membership with presence list
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
setCurrentWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
presenceCount: presenceUsers?.length || 0,
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
})
socketInstance.on('workflow-operation', (data) => {
eventHandlers.current.workflowOperation?.(data)
})
@@ -298,10 +353,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('workflow-deleted', (data) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
if (currentWorkflowId === data.workflowId) {
setCurrentWorkflowId(null)
setPresenceUsers([])
}
setCurrentWorkflowId((current) => {
if (current === data.workflowId) {
setPresenceUsers([])
return null
}
return current
})
eventHandlers.current.workflowDeleted?.(data)
})
@@ -444,25 +502,35 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('operation-forbidden', (error) => {
logger.warn('Operation forbidden:', error)
})
socketInstance.on('operation-confirmed', (data) => {
logger.debug('Operation confirmed:', data)
if (error?.type === 'SESSION_ERROR') {
const workflowId = urlWorkflowIdRef.current
if (workflowId && !isRejoiningRef.current) {
isRejoiningRef.current = true
logger.info(`Session expired, rejoining workflow: ${workflowId}`)
socketInstance.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
}
}
})
socketInstance.on('workflow-state', async (workflowData) => {
logger.info('Received workflow state from server')
if (workflowData?.state) {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
try {
await rehydrateWorkflowStores(workflowData.id, workflowData.state, 'workflow-state')
} catch (error) {
logger.error('Error rehydrating workflow state:', error)
}
}
})
socketRef.current = socketInstance
setSocket(socketInstance)
return () => {
socketInstance.close()
}
} catch (error) {
logger.error('Failed to initialize socket with token:', error)
setIsConnecting(false)
@@ -477,12 +545,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
positionUpdateTimeouts.current.clear()
pendingPositionUpdates.current.clear()
// Close socket on unmount
if (socketRef.current) {
logger.info('Closing socket connection on unmount')
socketRef.current.close()
socketRef.current = null
}
}
}, [user?.id])
}, [user?.id, authFailed])
useEffect(() => {
if (!socket || !isConnected || !urlWorkflowId) return
// Skip if already in the correct room
if (currentWorkflowId === urlWorkflowId) return
logger.info(
@@ -497,19 +573,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow room: ${urlWorkflowId}`)
socket.emit('join-workflow', {
workflowId: urlWorkflowId,
tabSessionId: getTabSessionId(),
})
setCurrentWorkflowId(urlWorkflowId)
}, [socket, isConnected, urlWorkflowId, currentWorkflowId])
useEffect(() => {
return () => {
if (socket) {
logger.info('Cleaning up socket connection on unmount')
socket.disconnect()
}
}
}, [])
const joinWorkflow = useCallback(
(workflowId: string) => {
if (!socket || !user?.id) {
@@ -530,8 +597,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
logger.info(`Joining workflow: ${workflowId}`)
socket.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
setCurrentWorkflowId(workflowId)
// currentWorkflowId will be set by join-workflow-success handler
},
[socket, user, currentWorkflowId]
)
@@ -539,10 +607,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const leaveWorkflow = useCallback(() => {
if (socket && currentWorkflowId) {
logger.info(`Leaving workflow: ${currentWorkflowId}`)
try {
const { useOperationQueueStore } = require('@/stores/operation-queue/store')
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
} catch {}
import('@/stores/operation-queue/store')
.then(({ useOperationQueueStore }) => {
useOperationQueueStore.getState().cancelOperationsForWorkflow(currentWorkflowId)
})
.catch((error) => {
logger.warn('Failed to cancel operations for workflow:', error)
})
socket.emit('leave-workflow')
setCurrentWorkflowId(null)
setPresenceUsers([])
@@ -555,8 +626,20 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
}, [socket, currentWorkflowId])
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
/**
* Retry socket connection after auth failure.
* Call this when user has re-authenticated (e.g., after login redirect).
*/
const retryConnection = useCallback(() => {
if (!authFailed) {
logger.info('retryConnection called but no auth failure - ignoring')
return
}
logger.info('Retrying socket connection after auth failure')
setAuthFailed(false)
// initializedRef.current was already reset in connect_error handler
// Effect will re-run and attempt connection
}, [authFailed])
const emitWorkflowOperation = useCallback(
(operation: string, target: string, payload: any, operationId?: string) => {
@@ -716,14 +799,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.selectionUpdate = handler
}, [])
const onUserJoined = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userJoined = handler
}, [])
const onUserLeft = useCallback((handler: (data: any) => void) => {
eventHandlers.current.userLeft = handler
}, [])
const onWorkflowDeleted = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowDeleted = handler
}, [])
@@ -745,11 +820,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -760,8 +837,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -771,11 +846,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socket,
isConnected,
isConnecting,
authFailed,
currentWorkflowId,
currentSocketId,
presenceUsers,
joinWorkflow,
leaveWorkflow,
retryConnection,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
@@ -786,8 +863,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -66,7 +66,10 @@ function generateSignature(secret: string, timestamp: number, body: string): str
async function buildPayload(
log: WorkflowExecutionLog,
subscription: typeof workspaceNotificationSubscription.$inferSelect
): Promise<NotificationPayload> {
): Promise<NotificationPayload | null> {
// Skip notifications for deleted workflows
if (!log.workflowId) return null
const workflowData = await db
.select({ name: workflowTable.name, userId: workflowTable.userId })
.from(workflowTable)
@@ -526,6 +529,13 @@ export async function executeNotificationDelivery(params: NotificationDeliveryPa
const attempts = claimed[0].attempts
const payload = await buildPayload(log, subscription)
// Skip delivery for deleted workflows
if (!payload) {
await updateDeliveryStatus(deliveryId, 'failed', 'Workflow was deleted')
logger.info(`Skipping delivery ${deliveryId} - workflow was deleted`)
return
}
let result: { success: boolean; status?: number; error?: string }
switch (notificationType) {

View File

@@ -9,7 +9,7 @@ export const YouTubeBlock: BlockConfig<YouTubeResponse> = {
description: 'Interact with YouTube videos, channels, and playlists',
authMode: AuthMode.ApiKey,
longDescription:
'Integrate YouTube into the workflow. Can search for videos, get video details, get channel information, get all videos from a channel, get channel playlists, get playlist items, find related videos, and get video comments.',
'Integrate YouTube into the workflow. Can search for videos, get trending videos, get video details, get video categories, get channel information, get all videos from a channel, get channel playlists, get playlist items, and get video comments.',
docsLink: 'https://docs.sim.ai/tools/youtube',
category: 'tools',
bgColor: '#FF0000',
@@ -21,7 +21,9 @@ export const YouTubeBlock: BlockConfig<YouTubeResponse> = {
type: 'dropdown',
options: [
{ label: 'Search Videos', id: 'youtube_search' },
{ label: 'Get Trending Videos', id: 'youtube_trending' },
{ label: 'Get Video Details', id: 'youtube_video_details' },
{ label: 'Get Video Categories', id: 'youtube_video_categories' },
{ label: 'Get Channel Info', id: 'youtube_channel_info' },
{ label: 'Get Channel Videos', id: 'youtube_channel_videos' },
{ label: 'Get Channel Playlists', id: 'youtube_channel_playlists' },
@@ -49,6 +51,13 @@ export const YouTubeBlock: BlockConfig<YouTubeResponse> = {
integer: true,
condition: { field: 'operation', value: 'youtube_search' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_search' },
},
{
id: 'channelId',
title: 'Filter by Channel ID',
@@ -56,6 +65,19 @@ export const YouTubeBlock: BlockConfig<YouTubeResponse> = {
placeholder: 'Filter results to a specific channel',
condition: { field: 'operation', value: 'youtube_search' },
},
{
id: 'eventType',
title: 'Live Stream Filter',
type: 'dropdown',
options: [
{ label: 'All Videos', id: '' },
{ label: 'Currently Live', id: 'live' },
{ label: 'Upcoming Streams', id: 'upcoming' },
{ label: 'Past Streams', id: 'completed' },
],
value: () => '',
condition: { field: 'operation', value: 'youtube_search' },
},
{
id: 'publishedAfter',
title: 'Published After',
@@ -131,7 +153,7 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
id: 'videoCategoryId',
title: 'Category ID',
type: 'short-input',
placeholder: '10 for Music, 20 for Gaming',
placeholder: 'Use Get Video Categories to find IDs',
condition: { field: 'operation', value: 'youtube_search' },
},
{
@@ -163,7 +185,10 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
title: 'Region Code',
type: 'short-input',
placeholder: 'US, GB, JP',
condition: { field: 'operation', value: 'youtube_search' },
condition: {
field: 'operation',
value: ['youtube_search', 'youtube_trending', 'youtube_video_categories'],
},
},
{
id: 'relevanceLanguage',
@@ -184,6 +209,31 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
value: () => 'moderate',
condition: { field: 'operation', value: 'youtube_search' },
},
// Get Trending Videos operation inputs
{
id: 'maxResults',
title: 'Max Results',
type: 'slider',
min: 1,
max: 50,
step: 1,
integer: true,
condition: { field: 'operation', value: 'youtube_trending' },
},
{
id: 'videoCategoryId',
title: 'Category ID',
type: 'short-input',
placeholder: 'Use Get Video Categories to find IDs',
condition: { field: 'operation', value: 'youtube_trending' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_trending' },
},
// Get Video Details operation inputs
{
id: 'videoId',
@@ -193,6 +243,14 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
required: true,
condition: { field: 'operation', value: 'youtube_video_details' },
},
// Get Video Categories operation inputs
{
id: 'hl',
title: 'Language',
type: 'short-input',
placeholder: 'en, es, fr (for category names)',
condition: { field: 'operation', value: 'youtube_video_categories' },
},
// Get Channel Info operation inputs
{
id: 'channelId',
@@ -241,6 +299,13 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
value: () => 'date',
condition: { field: 'operation', value: 'youtube_channel_videos' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_channel_videos' },
},
// Get Channel Playlists operation inputs
{
id: 'channelId',
@@ -260,6 +325,13 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
integer: true,
condition: { field: 'operation', value: 'youtube_channel_playlists' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_channel_playlists' },
},
// Get Playlist Items operation inputs
{
id: 'playlistId',
@@ -279,6 +351,13 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
integer: true,
condition: { field: 'operation', value: 'youtube_playlist_items' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_playlist_items' },
},
// Get Video Comments operation inputs
{
id: 'videoId',
@@ -309,6 +388,13 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
value: () => 'relevance',
condition: { field: 'operation', value: 'youtube_comments' },
},
{
id: 'pageToken',
title: 'Page Token',
type: 'short-input',
placeholder: 'Token for pagination (from nextPageToken)',
condition: { field: 'operation', value: 'youtube_comments' },
},
// API Key (common to all operations)
{
id: 'apiKey',
@@ -321,13 +407,15 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
],
tools: {
access: [
'youtube_search',
'youtube_video_details',
'youtube_channel_info',
'youtube_channel_videos',
'youtube_channel_playlists',
'youtube_playlist_items',
'youtube_channel_videos',
'youtube_comments',
'youtube_playlist_items',
'youtube_search',
'youtube_trending',
'youtube_video_categories',
'youtube_video_details',
],
config: {
tool: (params) => {
@@ -339,8 +427,12 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
switch (params.operation) {
case 'youtube_search':
return 'youtube_search'
case 'youtube_trending':
return 'youtube_trending'
case 'youtube_video_details':
return 'youtube_video_details'
case 'youtube_video_categories':
return 'youtube_video_categories'
case 'youtube_channel_info':
return 'youtube_channel_info'
case 'youtube_channel_videos':
@@ -363,6 +455,7 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
// Search Videos
query: { type: 'string', description: 'Search query' },
maxResults: { type: 'number', description: 'Maximum number of results' },
pageToken: { type: 'string', description: 'Page token for pagination' },
// Search Filters
publishedAfter: { type: 'string', description: 'Published after date (RFC 3339)' },
publishedBefore: { type: 'string', description: 'Published before date (RFC 3339)' },
@@ -370,9 +463,11 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
videoCategoryId: { type: 'string', description: 'YouTube category ID' },
videoDefinition: { type: 'string', description: 'Video quality filter' },
videoCaption: { type: 'string', description: 'Caption availability filter' },
eventType: { type: 'string', description: 'Live stream filter (live/upcoming/completed)' },
regionCode: { type: 'string', description: 'Region code (ISO 3166-1)' },
relevanceLanguage: { type: 'string', description: 'Language code (ISO 639-1)' },
safeSearch: { type: 'string', description: 'Safe search level' },
hl: { type: 'string', description: 'Language for category names' },
// Video Details & Comments
videoId: { type: 'string', description: 'YouTube video ID' },
// Channel Info
@@ -384,7 +479,7 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
order: { type: 'string', description: 'Sort order' },
},
outputs: {
// Search Videos & Playlist Items
// Search Videos, Trending, Playlist Items, Captions, Categories
items: { type: 'json', description: 'List of items returned' },
totalResults: { type: 'number', description: 'Total number of results' },
nextPageToken: { type: 'string', description: 'Token for next page' },
@@ -399,11 +494,33 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
viewCount: { type: 'number', description: 'View count' },
likeCount: { type: 'number', description: 'Like count' },
commentCount: { type: 'number', description: 'Comment count' },
favoriteCount: { type: 'number', description: 'Favorite count' },
thumbnail: { type: 'string', description: 'Thumbnail URL' },
tags: { type: 'json', description: 'Video tags' },
categoryId: { type: 'string', description: 'Video category ID' },
definition: { type: 'string', description: 'Video definition (hd/sd)' },
caption: { type: 'string', description: 'Has captions (true/false)' },
licensedContent: { type: 'boolean', description: 'Is licensed content' },
privacyStatus: { type: 'string', description: 'Privacy status' },
liveBroadcastContent: { type: 'string', description: 'Live broadcast status' },
defaultLanguage: { type: 'string', description: 'Default language' },
defaultAudioLanguage: { type: 'string', description: 'Default audio language' },
// Live Streaming Details
isLiveContent: { type: 'boolean', description: 'Whether video is/was a live stream' },
scheduledStartTime: { type: 'string', description: 'Scheduled start time for live streams' },
actualStartTime: { type: 'string', description: 'Actual start time of live stream' },
actualEndTime: { type: 'string', description: 'End time of live stream' },
concurrentViewers: { type: 'number', description: 'Current viewers (live only)' },
activeLiveChatId: { type: 'string', description: 'Live chat ID' },
// Channel Info
subscriberCount: { type: 'number', description: 'Subscriber count' },
videoCount: { type: 'number', description: 'Total video count' },
customUrl: { type: 'string', description: 'Channel custom URL' },
country: { type: 'string', description: 'Channel country' },
uploadsPlaylistId: { type: 'string', description: 'Uploads playlist ID' },
bannerImageUrl: { type: 'string', description: 'Channel banner URL' },
hiddenSubscriberCount: { type: 'boolean', description: 'Is subscriber count hidden' },
// Video Categories
assignable: { type: 'boolean', description: 'Whether category can be assigned' },
},
}

View File

@@ -6,6 +6,7 @@ interface ChildWorkflowErrorOptions {
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
childWorkflowSnapshotId?: string
cause?: Error
}
@@ -16,6 +17,7 @@ export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
readonly childWorkflowSnapshotId?: string
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
@@ -23,6 +25,7 @@ export class ChildWorkflowError extends Error {
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
this.childWorkflowSnapshotId = options.childWorkflowSnapshotId
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {

View File

@@ -237,6 +237,9 @@ export class BlockExecutor {
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
if (error.childWorkflowSnapshotId) {
errorOutput.childWorkflowSnapshotId = error.childWorkflowSnapshotId
}
}
this.state.setBlockOutput(node.id, errorOutput, duration)

View File

@@ -2417,4 +2417,177 @@ describe('EdgeManager', () => {
expect(successReady).toContain(targetId)
})
})
describe('Condition with loop downstream - deactivation propagation', () => {
it('should deactivate nodes after loop when condition branch containing loop is deactivated', () => {
// Scenario: condition → (if) → sentinel_start → loopBody → sentinel_end → (loop_exit) → after_loop
// → (else) → other_branch
// When condition takes "else" path, the entire if-branch including nodes after the loop should be deactivated
const conditionId = 'condition'
const sentinelStartId = 'sentinel-start'
const loopBodyId = 'loop-body'
const sentinelEndId = 'sentinel-end'
const afterLoopId = 'after-loop'
const otherBranchId = 'other-branch'
const conditionNode = createMockNode(conditionId, [
{ target: sentinelStartId, sourceHandle: 'condition-if' },
{ target: otherBranchId, sourceHandle: 'condition-else' },
])
const sentinelStartNode = createMockNode(
sentinelStartId,
[{ target: loopBodyId }],
[conditionId]
)
const loopBodyNode = createMockNode(
loopBodyId,
[{ target: sentinelEndId }],
[sentinelStartId]
)
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: afterLoopId, sourceHandle: 'loop_exit' },
],
[loopBodyId]
)
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
const otherBranchNode = createMockNode(otherBranchId, [], [conditionId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[sentinelStartId, sentinelStartNode],
[loopBodyId, loopBodyNode],
[sentinelEndId, sentinelEndNode],
[afterLoopId, afterLoopNode],
[otherBranchId, otherBranchNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch, deactivating the "if" branch (which contains the loop)
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
// Only otherBranch should be ready
expect(readyNodes).toContain(otherBranchId)
expect(readyNodes).not.toContain(sentinelStartId)
// afterLoop should NOT be ready - its incoming edge from sentinel_end should be deactivated
expect(readyNodes).not.toContain(afterLoopId)
// Verify that countActiveIncomingEdges returns 0 for afterLoop
// (meaning the loop_exit edge was properly deactivated)
// Note: isNodeReady returns true when all edges are deactivated (no pending deps),
// but the node won't be in readyNodes since it wasn't reached via an active path
expect(edgeManager.isNodeReady(afterLoopNode)).toBe(true) // All edges deactivated = no blocking deps
})
it('should deactivate nodes after parallel when condition branch containing parallel is deactivated', () => {
// Similar scenario with parallel instead of loop
const conditionId = 'condition'
const parallelStartId = 'parallel-start'
const parallelBodyId = 'parallel-body'
const parallelEndId = 'parallel-end'
const afterParallelId = 'after-parallel'
const otherBranchId = 'other-branch'
const conditionNode = createMockNode(conditionId, [
{ target: parallelStartId, sourceHandle: 'condition-if' },
{ target: otherBranchId, sourceHandle: 'condition-else' },
])
const parallelStartNode = createMockNode(
parallelStartId,
[{ target: parallelBodyId }],
[conditionId]
)
const parallelBodyNode = createMockNode(
parallelBodyId,
[{ target: parallelEndId }],
[parallelStartId]
)
const parallelEndNode = createMockNode(
parallelEndId,
[{ target: afterParallelId, sourceHandle: 'parallel_exit' }],
[parallelBodyId]
)
const afterParallelNode = createMockNode(afterParallelId, [], [parallelEndId])
const otherBranchNode = createMockNode(otherBranchId, [], [conditionId])
const nodes = new Map<string, DAGNode>([
[conditionId, conditionNode],
[parallelStartId, parallelStartNode],
[parallelBodyId, parallelBodyNode],
[parallelEndId, parallelEndNode],
[afterParallelId, afterParallelNode],
[otherBranchId, otherBranchNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Condition selects "else" branch
const readyNodes = edgeManager.processOutgoingEdges(conditionNode, { selectedOption: 'else' })
expect(readyNodes).toContain(otherBranchId)
expect(readyNodes).not.toContain(parallelStartId)
expect(readyNodes).not.toContain(afterParallelId)
// isNodeReady returns true when all edges are deactivated (no pending deps)
expect(edgeManager.isNodeReady(afterParallelNode)).toBe(true)
})
it('should still correctly handle normal loop exit (not deactivate when loop runs)', () => {
// When a loop actually executes and exits normally, after_loop should become ready
const sentinelStartId = 'sentinel-start'
const loopBodyId = 'loop-body'
const sentinelEndId = 'sentinel-end'
const afterLoopId = 'after-loop'
const sentinelStartNode = createMockNode(sentinelStartId, [{ target: loopBodyId }])
const loopBodyNode = createMockNode(
loopBodyId,
[{ target: sentinelEndId }],
[sentinelStartId]
)
const sentinelEndNode = createMockNode(
sentinelEndId,
[
{ target: sentinelStartId, sourceHandle: 'loop_continue' },
{ target: afterLoopId, sourceHandle: 'loop_exit' },
],
[loopBodyId]
)
const afterLoopNode = createMockNode(afterLoopId, [], [sentinelEndId])
const nodes = new Map<string, DAGNode>([
[sentinelStartId, sentinelStartNode],
[loopBodyId, loopBodyNode],
[sentinelEndId, sentinelEndNode],
[afterLoopId, afterLoopNode],
])
const dag = createMockDAG(nodes)
const edgeManager = new EdgeManager(dag)
// Simulate sentinel_end completing with loop_exit (loop is done)
const readyNodes = edgeManager.processOutgoingEdges(sentinelEndNode, {
selectedRoute: 'loop_exit',
})
// afterLoop should be ready
expect(readyNodes).toContain(afterLoopId)
})
})
})

View File

@@ -243,7 +243,7 @@ export class EdgeManager {
}
for (const [, outgoingEdge] of targetNode.outgoingEdges) {
if (!this.isControlEdge(outgoingEdge.sourceHandle)) {
if (!this.isBackwardsEdge(outgoingEdge.sourceHandle)) {
this.deactivateEdgeAndDescendants(
targetId,
outgoingEdge.target,

View File

@@ -198,6 +198,7 @@ describe('WorkflowBlockHandler', () => {
expect(result).toEqual({
success: true,
childWorkflowId: 'child-id',
childWorkflowName: 'Child Workflow',
result: { data: 'test result' },
childTraceSpans: [],
@@ -235,6 +236,7 @@ describe('WorkflowBlockHandler', () => {
expect(result).toEqual({
success: true,
childWorkflowId: 'child-id',
childWorkflowName: 'Child Workflow',
result: { nested: 'data' },
childTraceSpans: [],

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
@@ -57,6 +58,7 @@ export class WorkflowBlockHandler implements BlockHandler {
const workflowMetadata = workflows[workflowId]
let childWorkflowName = workflowMetadata?.name || workflowId
let childWorkflowSnapshotId: string | undefined
try {
const currentDepth = (ctx.workflowId?.split('_sub_').length || 1) - 1
if (currentDepth >= DEFAULTS.MAX_WORKFLOW_DEPTH) {
@@ -107,6 +109,12 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowInput = inputs.input
}
const childSnapshotResult = await snapshotService.createSnapshotWithDeduplication(
workflowId,
childWorkflow.workflowState
)
childWorkflowSnapshotId = childSnapshotResult.snapshot.id
const subExecutor = new Executor({
workflow: childWorkflow.serializedState,
workflowInput: childWorkflowInput,
@@ -139,7 +147,8 @@ export class WorkflowBlockHandler implements BlockHandler {
workflowId,
childWorkflowName,
duration,
childTraceSpans
childTraceSpans,
childWorkflowSnapshotId
)
return mappedResult
@@ -172,6 +181,7 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowName,
childTraceSpans,
executionResult,
childWorkflowSnapshotId,
cause: error instanceof Error ? error : undefined,
})
}
@@ -279,6 +289,10 @@ export class WorkflowBlockHandler implements BlockHandler {
)
const workflowVariables = (workflowData.variables as Record<string, any>) || {}
const workflowStateWithVariables = {
...workflowState,
variables: workflowVariables,
}
if (Object.keys(workflowVariables).length > 0) {
logger.info(
@@ -290,6 +304,7 @@ export class WorkflowBlockHandler implements BlockHandler {
name: workflowData.name,
serializedState: serializedWorkflow,
variables: workflowVariables,
workflowState: workflowStateWithVariables,
rawBlocks: workflowState.blocks,
}
}
@@ -358,11 +373,16 @@ export class WorkflowBlockHandler implements BlockHandler {
)
const workflowVariables = (wfData?.variables as Record<string, any>) || {}
const workflowStateWithVariables = {
...deployedState,
variables: workflowVariables,
}
return {
name: wfData?.name || DEFAULTS.WORKFLOW_NAME,
serializedState: serializedWorkflow,
variables: workflowVariables,
workflowState: workflowStateWithVariables,
rawBlocks: deployedState.blocks,
}
}
@@ -504,7 +524,8 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowId: string,
childWorkflowName: string,
duration: number,
childTraceSpans?: WorkflowTraceSpan[]
childTraceSpans?: WorkflowTraceSpan[],
childWorkflowSnapshotId?: string
): BlockOutput {
const success = childResult.success !== false
const result = childResult.output || {}
@@ -515,12 +536,15 @@ export class WorkflowBlockHandler implements BlockHandler {
message: `"${childWorkflowName}" failed: ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
childWorkflowSnapshotId,
})
}
return {
success: true,
childWorkflowName,
childWorkflowId,
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
result,
childTraceSpans: childTraceSpans || [],
} as Record<string, any>

View File

@@ -210,6 +210,7 @@ export interface ExecutionSnapshotData {
executionId: string
workflowId: string
workflowState: Record<string, unknown>
childWorkflowSnapshots?: Record<string, Record<string, unknown>>
executionMetadata: {
trigger: string
startedAt: string

View File

@@ -119,8 +119,6 @@ export function useCollaborativeWorkflow() {
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,
@@ -484,14 +482,6 @@ export function useCollaborativeWorkflow() {
}
}
const handleUserJoined = (data: any) => {
logger.info(`User joined: ${data.userName}`)
}
const handleUserLeft = (data: any) => {
logger.info(`User left: ${data.userId}`)
}
const handleWorkflowDeleted = (data: any) => {
const { workflowId } = data
logger.warn(`Workflow ${workflowId} has been deleted`)
@@ -600,26 +590,17 @@ export function useCollaborativeWorkflow() {
failOperation(operationId, retryable)
}
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onVariableUpdate(handleVariableUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
return () => {
// Cleanup handled by socket context
}
}, [
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
onOperationConfirmed,

View File

@@ -10,6 +10,7 @@ import {
type KnowledgeBaseArgs,
} from '@/lib/copilot/tools/shared/schemas'
import { useCopilotStore } from '@/stores/panel/copilot/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
/**
* Client tool for knowledge base operations
@@ -102,7 +103,19 @@ export class KnowledgeBaseClientTool extends BaseClientTool {
const logger = createLogger('KnowledgeBaseClientTool')
try {
this.setState(ClientToolCallState.executing)
const payload: KnowledgeBaseArgs = { ...(args || { operation: 'list' }) }
// Get the workspace ID from the workflow registry hydration state
const { hydration } = useWorkflowRegistry.getState()
const workspaceId = hydration.workspaceId
// Build payload with workspace ID included in args
const payload: KnowledgeBaseArgs = {
...(args || { operation: 'list' }),
args: {
...(args?.args || {}),
workspaceId: workspaceId || undefined,
},
}
const res = await fetch('/api/copilot/execute-copilot-server-tool', {
method: 'POST',

View File

@@ -2508,6 +2508,10 @@ async function validateWorkflowSelectorIds(
for (const subBlockConfig of blockConfig.subBlocks) {
if (!SELECTOR_TYPES.has(subBlockConfig.type)) continue
// Skip oauth-input - credentials are pre-validated before edit application
// This allows existing collaborator credentials to remain untouched
if (subBlockConfig.type === 'oauth-input') continue
const subBlockValue = blockData.subBlocks?.[subBlockConfig.id]?.value
if (!subBlockValue) continue
@@ -2573,6 +2577,295 @@ async function validateWorkflowSelectorIds(
return errors
}
/**
* Pre-validates credential and apiKey inputs in operations before they are applied.
* - Validates oauth-input (credential) IDs belong to the user
* - Filters out apiKey inputs for hosted models when isHosted is true
* - Also validates credentials and apiKeys in nestedNodes (blocks inside loop/parallel)
* Returns validation errors for any removed inputs.
*/
async function preValidateCredentialInputs(
operations: EditWorkflowOperation[],
context: { userId: string },
workflowState?: Record<string, unknown>
): Promise<{ filteredOperations: EditWorkflowOperation[]; errors: ValidationError[] }> {
const { isHosted } = await import('@/lib/core/config/feature-flags')
const { getHostedModels } = await import('@/providers/utils')
const logger = createLogger('PreValidateCredentials')
const errors: ValidationError[] = []
// Collect credential and apiKey inputs that need validation/filtering
const credentialInputs: Array<{
operationIndex: number
blockId: string
blockType: string
fieldName: string
value: string
nestedBlockId?: string
}> = []
const hostedApiKeyInputs: Array<{
operationIndex: number
blockId: string
blockType: string
model: string
nestedBlockId?: string
}> = []
const hostedModelsLower = isHosted ? new Set(getHostedModels().map((m) => m.toLowerCase())) : null
/**
* Collect credential inputs from a block's inputs based on its block config
*/
function collectCredentialInputs(
blockConfig: ReturnType<typeof getBlock>,
inputs: Record<string, unknown>,
opIndex: number,
blockId: string,
blockType: string,
nestedBlockId?: string
) {
if (!blockConfig) return
for (const subBlockConfig of blockConfig.subBlocks) {
if (subBlockConfig.type !== 'oauth-input') continue
const inputValue = inputs[subBlockConfig.id]
if (!inputValue || typeof inputValue !== 'string' || inputValue.trim() === '') continue
credentialInputs.push({
operationIndex: opIndex,
blockId,
blockType,
fieldName: subBlockConfig.id,
value: inputValue,
nestedBlockId,
})
}
}
/**
* Check if apiKey should be filtered for a block with the given model
*/
function collectHostedApiKeyInput(
inputs: Record<string, unknown>,
modelValue: string | undefined,
opIndex: number,
blockId: string,
blockType: string,
nestedBlockId?: string
) {
if (!hostedModelsLower || !inputs.apiKey) return
if (!modelValue || typeof modelValue !== 'string') return
if (hostedModelsLower.has(modelValue.toLowerCase())) {
hostedApiKeyInputs.push({
operationIndex: opIndex,
blockId,
blockType,
model: modelValue,
nestedBlockId,
})
}
}
operations.forEach((op, opIndex) => {
// Process main block inputs
if (op.params?.inputs && op.params?.type) {
const blockConfig = getBlock(op.params.type)
if (blockConfig) {
// Collect credentials from main block
collectCredentialInputs(
blockConfig,
op.params.inputs as Record<string, unknown>,
opIndex,
op.block_id,
op.params.type
)
// Check for apiKey inputs on hosted models
let modelValue = (op.params.inputs as Record<string, unknown>).model as string | undefined
// For edit operations, if model is not being changed, check existing block's model
if (
!modelValue &&
op.operation_type === 'edit' &&
(op.params.inputs as Record<string, unknown>).apiKey &&
workflowState
) {
const existingBlock = (workflowState.blocks as Record<string, unknown>)?.[op.block_id] as
| Record<string, unknown>
| undefined
const existingSubBlocks = existingBlock?.subBlocks as Record<string, unknown> | undefined
const existingModelSubBlock = existingSubBlocks?.model as
| Record<string, unknown>
| undefined
modelValue = existingModelSubBlock?.value as string | undefined
}
collectHostedApiKeyInput(
op.params.inputs as Record<string, unknown>,
modelValue,
opIndex,
op.block_id,
op.params.type
)
}
}
// Process nested nodes (blocks inside loop/parallel containers)
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
if (nestedNodes) {
Object.entries(nestedNodes).forEach(([childId, childBlock]) => {
const childType = childBlock.type as string | undefined
const childInputs = childBlock.inputs as Record<string, unknown> | undefined
if (!childType || !childInputs) return
const childBlockConfig = getBlock(childType)
if (!childBlockConfig) return
// Collect credentials from nested block
collectCredentialInputs(
childBlockConfig,
childInputs,
opIndex,
op.block_id,
childType,
childId
)
// Check for apiKey inputs on hosted models in nested block
const modelValue = childInputs.model as string | undefined
collectHostedApiKeyInput(childInputs, modelValue, opIndex, op.block_id, childType, childId)
})
}
})
const hasCredentialsToValidate = credentialInputs.length > 0
const hasHostedApiKeysToFilter = hostedApiKeyInputs.length > 0
if (!hasCredentialsToValidate && !hasHostedApiKeysToFilter) {
return { filteredOperations: operations, errors }
}
// Deep clone operations so we can modify them
const filteredOperations = structuredClone(operations)
// Filter out apiKey inputs for hosted models and add validation errors
if (hasHostedApiKeysToFilter) {
logger.info('Filtering apiKey inputs for hosted models', { count: hostedApiKeyInputs.length })
for (const apiKeyInput of hostedApiKeyInputs) {
const op = filteredOperations[apiKeyInput.operationIndex]
// Handle nested block apiKey filtering
if (apiKeyInput.nestedBlockId) {
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
const nestedBlock = nestedNodes?.[apiKeyInput.nestedBlockId]
const nestedInputs = nestedBlock?.inputs as Record<string, unknown> | undefined
if (nestedInputs?.apiKey) {
nestedInputs.apiKey = undefined
logger.debug('Filtered apiKey for hosted model in nested block', {
parentBlockId: apiKeyInput.blockId,
nestedBlockId: apiKeyInput.nestedBlockId,
model: apiKeyInput.model,
})
errors.push({
blockId: apiKeyInput.nestedBlockId,
blockType: apiKeyInput.blockType,
field: 'apiKey',
value: '[redacted]',
error: `Cannot set API key for hosted model "${apiKeyInput.model}" - API keys are managed by the platform when using hosted models`,
})
}
} else if (op.params?.inputs?.apiKey) {
// Handle main block apiKey filtering
op.params.inputs.apiKey = undefined
logger.debug('Filtered apiKey for hosted model', {
blockId: apiKeyInput.blockId,
model: apiKeyInput.model,
})
errors.push({
blockId: apiKeyInput.blockId,
blockType: apiKeyInput.blockType,
field: 'apiKey',
value: '[redacted]',
error: `Cannot set API key for hosted model "${apiKeyInput.model}" - API keys are managed by the platform when using hosted models`,
})
}
}
}
// Validate credential inputs
if (hasCredentialsToValidate) {
logger.info('Pre-validating credential inputs', {
credentialCount: credentialInputs.length,
userId: context.userId,
})
const allCredentialIds = credentialInputs.map((c) => c.value)
const validationResult = await validateSelectorIds('oauth-input', allCredentialIds, context)
const invalidSet = new Set(validationResult.invalid)
if (invalidSet.size > 0) {
for (const credInput of credentialInputs) {
if (!invalidSet.has(credInput.value)) continue
const op = filteredOperations[credInput.operationIndex]
// Handle nested block credential removal
if (credInput.nestedBlockId) {
const nestedNodes = op.params?.nestedNodes as
| Record<string, Record<string, unknown>>
| undefined
const nestedBlock = nestedNodes?.[credInput.nestedBlockId]
const nestedInputs = nestedBlock?.inputs as Record<string, unknown> | undefined
if (nestedInputs?.[credInput.fieldName]) {
delete nestedInputs[credInput.fieldName]
logger.info('Removed invalid credential from nested block', {
parentBlockId: credInput.blockId,
nestedBlockId: credInput.nestedBlockId,
field: credInput.fieldName,
invalidValue: credInput.value,
})
}
} else if (op.params?.inputs?.[credInput.fieldName]) {
// Handle main block credential removal
delete op.params.inputs[credInput.fieldName]
logger.info('Removed invalid credential from operation', {
blockId: credInput.blockId,
field: credInput.fieldName,
invalidValue: credInput.value,
})
}
const warningInfo = validationResult.warning ? `. ${validationResult.warning}` : ''
const errorBlockId = credInput.nestedBlockId ?? credInput.blockId
errors.push({
blockId: errorBlockId,
blockType: credInput.blockType,
field: credInput.fieldName,
value: credInput.value,
error: `Invalid credential ID "${credInput.value}" - credential does not exist or user doesn't have access${warningInfo}`,
})
}
logger.warn('Filtered out invalid credentials', {
invalidCount: invalidSet.size,
})
}
}
return { filteredOperations, errors }
}
async function getCurrentWorkflowStateFromDb(
workflowId: string
): Promise<{ workflowState: any; subBlockValues: Record<string, Record<string, any>> }> {
@@ -2657,12 +2950,29 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
// Get permission config for the user
const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null
// Pre-validate credential and apiKey inputs before applying operations
// This filters out invalid credentials and apiKeys for hosted models
let operationsToApply = operations
const credentialErrors: ValidationError[] = []
if (context?.userId) {
const { filteredOperations, errors: credErrors } = await preValidateCredentialInputs(
operations,
{ userId: context.userId },
workflowState
)
operationsToApply = filteredOperations
credentialErrors.push(...credErrors)
}
// Apply operations directly to the workflow state
const {
state: modifiedWorkflowState,
validationErrors,
skippedItems,
} = applyOperationsToWorkflowState(workflowState, operations, permissionConfig)
} = applyOperationsToWorkflowState(workflowState, operationsToApply, permissionConfig)
// Add credential validation errors
validationErrors.push(...credentialErrors)
// Get workspaceId for selector validation
let workspaceId: string | undefined

View File

@@ -50,6 +50,8 @@ function prepareLogData(
export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): Promise<void> {
try {
if (!log.workflowId) return
const workflowData = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)

View File

@@ -293,7 +293,10 @@ export class ExecutionLogger implements IExecutionLoggerService {
}
try {
const [wf] = await db.select().from(workflow).where(eq(workflow.id, updatedLog.workflowId))
// Skip workflow lookup if workflow was deleted
const wf = updatedLog.workflowId
? (await db.select().from(workflow).where(eq(workflow.id, updatedLog.workflowId)))[0]
: undefined
if (wf) {
const [usr] = await db
.select({ id: userTable.id, email: userTable.email, name: userTable.name })
@@ -461,7 +464,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
* Maintains same logic as original execution logger for billing consistency
*/
private async updateUserStats(
workflowId: string,
workflowId: string | null,
costSummary: {
totalCost: number
totalInputCost: number
@@ -494,6 +497,11 @@ export class ExecutionLogger implements IExecutionLoggerService {
return
}
if (!workflowId) {
logger.debug('Workflow was deleted, skipping user stats update')
return
}
try {
// Get the workflow record to get the userId
const [workflowRecord] = await db

View File

@@ -1,8 +1,8 @@
import { createHash } from 'crypto'
import { db } from '@sim/db'
import { workflowExecutionSnapshots } from '@sim/db/schema'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt } from 'drizzle-orm'
import { and, eq, lt, notExists } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type {
SnapshotService as ISnapshotService,
@@ -121,7 +121,17 @@ export class SnapshotService implements ISnapshotService {
const deletedSnapshots = await db
.delete(workflowExecutionSnapshots)
.where(lt(workflowExecutionSnapshots.createdAt, cutoffDate))
.where(
and(
lt(workflowExecutionSnapshots.createdAt, cutoffDate),
notExists(
db
.select({ id: workflowExecutionLogs.id })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.stateSnapshotId, workflowExecutionSnapshots.id))
)
)
)
.returning({ id: workflowExecutionSnapshots.id })
const deletedCount = deletedSnapshots.length

View File

@@ -112,6 +112,26 @@ export function buildTraceSpans(result: ExecutionResult): {
const duration = log.durationMs || 0
let output = log.output || {}
let childWorkflowSnapshotId: string | undefined
let childWorkflowId: string | undefined
if (output && typeof output === 'object') {
const outputRecord = output as Record<string, unknown>
childWorkflowSnapshotId =
typeof outputRecord.childWorkflowSnapshotId === 'string'
? outputRecord.childWorkflowSnapshotId
: undefined
childWorkflowId =
typeof outputRecord.childWorkflowId === 'string' ? outputRecord.childWorkflowId : undefined
if (childWorkflowSnapshotId || childWorkflowId) {
const {
childWorkflowSnapshotId: _childSnapshotId,
childWorkflowId: _childWorkflowId,
...outputRest
} = outputRecord
output = outputRest
}
}
if (log.error) {
output = {
@@ -134,6 +154,8 @@ export function buildTraceSpans(result: ExecutionResult): {
blockId: log.blockId,
input: log.input || {},
output: output,
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
...(childWorkflowId ? { childWorkflowId } : {}),
...(log.loopId && { loopId: log.loopId }),
...(log.parallelId && { parallelId: log.parallelId }),
...(log.iterationIndex !== undefined && { iterationIndex: log.iterationIndex }),

View File

@@ -69,7 +69,7 @@ export interface ExecutionStatus {
export interface WorkflowExecutionSnapshot {
id: string
workflowId: string
workflowId: string | null
stateHash: string
stateData: WorkflowState
createdAt: string
@@ -80,7 +80,7 @@ export type WorkflowExecutionSnapshotSelect = WorkflowExecutionSnapshot
export interface WorkflowExecutionLog {
id: string
workflowId: string
workflowId: string | null
executionId: string
stateSnapshotId: string
level: 'info' | 'error'
@@ -178,6 +178,8 @@ export interface TraceSpan {
blockId?: string
input?: Record<string, unknown>
output?: Record<string, unknown>
childWorkflowSnapshotId?: string
childWorkflowId?: string
model?: string
cost?: {
input?: number

View File

@@ -325,18 +325,6 @@ const nextConfig: NextConfig = {
return redirects
},
async rewrites() {
return [
{
source: '/ingest/static/:path*',
destination: 'https://us-assets.i.posthog.com/static/:path*',
},
{
source: '/ingest/:path*',
destination: 'https://us.i.posthog.com/:path*',
},
]
},
}
export default nextConfig

View File

@@ -74,6 +74,7 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -144,6 +145,7 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",

View File

@@ -134,6 +134,24 @@ function handleSecurityFiltering(request: NextRequest): NextResponse | null {
export async function proxy(request: NextRequest) {
const url = request.nextUrl
if (url.pathname.startsWith('/ingest/')) {
const hostname = url.pathname.startsWith('/ingest/static/')
? 'us-assets.i.posthog.com'
: 'us.i.posthog.com'
const targetPath = url.pathname.replace(/^\/ingest/, '')
const targetUrl = `https://${hostname}${targetPath}${url.search}`
return NextResponse.rewrite(new URL(targetUrl), {
request: {
headers: new Headers({
...Object.fromEntries(request.headers),
host: hostname,
}),
},
})
}
const sessionCookie = getSessionCookie(request)
const hasActiveSession = isAuthDisabled || !!sessionCookie
@@ -195,6 +213,7 @@ export async function proxy(request: NextRequest) {
export const config = {
matcher: [
'/ingest/:path*', // PostHog proxy for session recording
'/', // Root path for self-hosted redirect logic
'/terms', // Whitelabel terms redirect
'/privacy', // Whitelabel privacy redirect

View File

@@ -1,5 +1,7 @@
import type { Server as HttpServer } from 'http'
import { createLogger } from '@sim/logger'
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient, type RedisClientType } from 'redis'
import { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { isProd } from '@/lib/core/config/feature-flags'
@@ -7,9 +9,16 @@ import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('SocketIOConfig')
/**
* Get allowed origins for Socket.IO CORS configuration
*/
/** Socket.IO ping timeout - how long to wait for pong before considering connection dead */
const PING_TIMEOUT_MS = 60000
/** Socket.IO ping interval - how often to send ping packets */
const PING_INTERVAL_MS = 25000
/** Maximum HTTP buffer size for Socket.IO messages */
const MAX_HTTP_BUFFER_SIZE = 1e6
let adapterPubClient: RedisClientType | null = null
let adapterSubClient: RedisClientType | null = null
function getAllowedOrigins(): string[] {
const allowedOrigins = [
getBaseUrl(),
@@ -24,11 +33,10 @@ function getAllowedOrigins(): string[] {
}
/**
* Create and configure a Socket.IO server instance
* @param httpServer - The HTTP server instance to attach Socket.IO to
* @returns Configured Socket.IO server instance
* Create and configure a Socket.IO server instance.
* If REDIS_URL is configured, adds Redis adapter for cross-pod broadcasting.
*/
export function createSocketIOServer(httpServer: HttpServer): Server {
export async function createSocketIOServer(httpServer: HttpServer): Promise<Server> {
const allowedOrigins = getAllowedOrigins()
const io = new Server(httpServer, {
@@ -36,31 +44,110 @@ export function createSocketIOServer(httpServer: HttpServer): Server {
origin: allowedOrigins,
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization', 'Cookie', 'socket.io'],
credentials: true, // Enable credentials to accept cookies
credentials: true,
},
transports: ['websocket', 'polling'], // WebSocket first, polling as fallback
allowEIO3: true, // Keep legacy support for compatibility
pingTimeout: 60000, // Back to original conservative setting
pingInterval: 25000, // Back to original interval
maxHttpBufferSize: 1e6,
transports: ['websocket', 'polling'],
allowEIO3: true,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
cookie: {
name: 'io',
path: '/',
httpOnly: true,
sameSite: 'none', // Required for cross-origin cookies
secure: isProd, // HTTPS in production
sameSite: 'none',
secure: isProd,
},
})
if (env.REDIS_URL) {
logger.info('Configuring Socket.IO Redis adapter...')
const redisOptions = {
url: env.REDIS_URL,
socket: {
reconnectStrategy: (retries: number) => {
if (retries > 10) {
logger.error('Redis adapter reconnection failed after 10 attempts')
return new Error('Redis adapter reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis adapter reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
}
// Create separate clients for pub and sub (recommended for reliability)
adapterPubClient = createClient(redisOptions)
adapterSubClient = createClient(redisOptions)
adapterPubClient.on('error', (err) => {
logger.error('Redis adapter pub client error:', err)
})
adapterSubClient.on('error', (err) => {
logger.error('Redis adapter sub client error:', err)
})
adapterPubClient.on('ready', () => {
logger.info('Redis adapter pub client ready')
})
adapterSubClient.on('ready', () => {
logger.info('Redis adapter sub client ready')
})
await Promise.all([adapterPubClient.connect(), adapterSubClient.connect()])
io.adapter(createAdapter(adapterPubClient, adapterSubClient))
logger.info('Socket.IO Redis adapter connected - cross-pod broadcasting enabled')
} else {
logger.warn('REDIS_URL not configured - running in single-pod mode')
}
logger.info('Socket.IO server configured with:', {
allowedOrigins: allowedOrigins.length,
transports: ['websocket', 'polling'],
pingTimeout: 60000,
pingInterval: 25000,
maxHttpBufferSize: 1e6,
pingTimeout: PING_TIMEOUT_MS,
pingInterval: PING_INTERVAL_MS,
maxHttpBufferSize: MAX_HTTP_BUFFER_SIZE,
cookieSecure: isProd,
corsCredentials: true,
redisAdapter: !!env.REDIS_URL,
})
return io
}
/**
* Clean up Redis adapter connections.
* Call this during graceful shutdown.
*/
export async function shutdownSocketIOAdapter(): Promise<void> {
const closePromises: Promise<void>[] = []
if (adapterPubClient) {
closePromises.push(
adapterPubClient.quit().then(() => {
logger.info('Redis adapter pub client closed')
adapterPubClient = null
})
)
}
if (adapterSubClient) {
closePromises.push(
adapterSubClient.quit().then(() => {
logger.info('Redis adapter sub client closed')
adapterSubClient = null
})
)
}
if (closePromises.length > 0) {
await Promise.all(closePromises)
logger.info('Socket.IO Redis adapter shutdown complete')
}
}

View File

@@ -1,17 +1,12 @@
import { createLogger } from '@sim/logger'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import { cleanupPendingSubblocksForSocket } from '@/socket/handlers/subblocks'
import { cleanupPendingVariablesForSocket } from '@/socket/handlers/variables'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { RoomManager } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('ConnectionHandlers')
export function setupConnectionHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
export function setupConnectionHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('error', (error) => {
logger.error(`Socket ${socket.id} error:`, error)
})
@@ -20,13 +15,22 @@ export function setupConnectionHandlers(
logger.error(`Socket ${socket.id} connection error:`, error)
})
socket.on('disconnect', (reason) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
socket.on('disconnect', async (reason) => {
try {
// Clean up pending debounce entries for this socket to prevent memory leaks
cleanupPendingSubblocksForSocket(socket.id)
cleanupPendingVariablesForSocket(socket.id)
if (workflowId && session) {
roomManager.cleanupUserFromRoom(socket.id, workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
const workflowId = await roomManager.removeUserFromRoom(socket.id)
if (workflowId) {
await roomManager.broadcastPresenceUpdate(workflowId)
logger.info(
`Socket ${socket.id} disconnected from workflow ${workflowId} (reason: ${reason})`
)
}
} catch (error) {
logger.error(`Error handling disconnect for socket ${socket.id}:`, error)
}
})
}

View File

@@ -5,16 +5,9 @@ import { setupSubblocksHandlers } from '@/socket/handlers/subblocks'
import { setupVariablesHandlers } from '@/socket/handlers/variables'
import { setupWorkflowHandlers } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
export type { UserPresence, WorkflowRoom }
/**
* Sets up all socket event handlers for an authenticated socket connection
* @param socket - The authenticated socket instance
* @param roomManager - Room manager instance for state management
*/
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomManager) {
export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
setupWorkflowHandlers(socket, roomManager)
setupOperationsHandlers(socket, roomManager)
setupSubblocksHandlers(socket, roomManager)
@@ -22,12 +15,3 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomM
setupPresenceHandlers(socket, roomManager)
setupConnectionHandlers(socket, roomManager)
}
export {
setupWorkflowHandlers,
setupOperationsHandlers,
setupSubblocksHandlers,
setupVariablesHandlers,
setupPresenceHandlers,
setupConnectionHandlers,
}

View File

@@ -10,38 +10,41 @@ import {
WORKFLOW_OPERATIONS,
} from '@/socket/constants'
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { RoomManager } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('error', {
type: 'NOT_JOINED',
message: 'Not joined to any workflow',
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
}
return
}
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
socket.emit('error', {
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
socket.emit('operation-forbidden', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return
}
@@ -60,16 +63,18 @@ export function setupOperationsHandlers(
isPositionUpdate && 'commit' in payload ? payload.commit === true : false
const operationTimestamp = isPositionUpdate ? timestamp : Date.now()
// Get user presence for permission checking
const users = await roomManager.getWorkflowUsers(workflowId)
const userPresence = users.find((u) => u.socketId === socket.id)
// Skip permission checks for non-committed position updates (broadcasts only, no persistence)
if (isPositionUpdate && !commitPositionUpdate) {
// Update last activity
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
}
} else {
// Check permissions from cached role for all other operations
const userPresence = room.users.get(socket.id)
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
@@ -78,10 +83,13 @@ export function setupOperationsHandlers(
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
return
}
userPresence.lastActivity = Date.now()
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
// Check permissions using cached role (no DB query)
const permissionCheck = checkRolePermission(userPresence.role, operation)
@@ -132,7 +140,7 @@ export function setupOperationsHandlers(
timestamp: operationTimestamp,
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
if (operationId) {
socket.emit('operation-confirmed', {
@@ -178,7 +186,7 @@ export function setupOperationsHandlers(
timestamp: operationTimestamp,
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
if (operationId) {
socket.emit('operation-confirmed', { operationId, serverTimestamp: Date.now() })
@@ -211,7 +219,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
const broadcastData = {
operation,
@@ -251,7 +259,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
const broadcastData = {
operation,
@@ -288,7 +296,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -320,7 +328,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -349,7 +357,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -381,7 +389,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -413,7 +421,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -445,7 +453,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -474,7 +482,7 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
socket.to(workflowId).emit('workflow-operation', {
operation,
@@ -503,27 +511,24 @@ export function setupOperationsHandlers(
userId: session.userId,
})
room.lastModified = Date.now()
await roomManager.updateRoomLastModified(workflowId)
const broadcastData = {
operation,
target,
payload,
timestamp: operationTimestamp, // Preserve client timestamp for position updates
timestamp: operationTimestamp,
senderId: socket.id,
userId: session.userId,
userName: session.userName,
// Add operation metadata for better client handling
metadata: {
workflowId,
operationId: crypto.randomUUID(),
isPositionUpdate, // Flag to help clients handle position updates specially
},
}
socket.to(workflowId).emit('workflow-operation', broadcastData)
// Emit confirmation if operationId is provided
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
@@ -533,16 +538,14 @@ export function setupOperationsHandlers(
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
// Emit operation-failed for queue-tracked operations
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: !(error instanceof ZodError), // Don't retry validation errors
retryable: !(error instanceof ZodError),
})
}
// Also emit legacy operation-error for backward compatibility
if (error instanceof ZodError) {
socket.emit('operation-error', {
type: 'VALIDATION_ERROR',
@@ -553,7 +556,6 @@ export function setupOperationsHandlers(
})
logger.warn(`Validation error for operation from ${session.userId}:`, error.errors)
} else if (error instanceof Error) {
// Handle specific database errors
if (error.message.includes('not found')) {
socket.emit('operation-error', {
type: 'RESOURCE_NOT_FOUND',

View File

@@ -1,62 +1,53 @@
import { createLogger } from '@sim/logger'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { RoomManager } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('PresenceHandlers')
export function setupPresenceHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('cursor-update', ({ cursor }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
export function setupPresenceHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('cursor-update', async ({ cursor }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Update cursor in room state
await roomManager.updateUserActivity(workflowId, socket.id, { cursor })
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.cursor = cursor
userPresence.lastActivity = Date.now()
// Broadcast to other users in the room
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
} catch (error) {
logger.error(`Error handling cursor update for socket ${socket.id}:`, error)
}
socket.to(workflowId).emit('cursor-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
cursor,
})
})
// Handle user selection (for showing what block/element a user has selected)
socket.on('selection-update', ({ selection }) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
socket.on('selection-update', async ({ selection }) => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) return
if (!workflowId || !session) return
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) return
// Update selection in room state
await roomManager.updateUserActivity(workflowId, socket.id, { selection })
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.selection = selection
userPresence.lastActivity = Date.now()
// Broadcast to other users in the room
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
} catch (error) {
logger.error(`Error handling selection update for socket ${socket.id}:`, error)
}
socket.to(workflowId).emit('selection-update', {
socketId: socket.id,
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl,
selection,
})
})
}

View File

@@ -2,12 +2,14 @@ import { db } from '@sim/db'
import { workflow, workflowBlocks } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { RoomManager } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('SubblocksHandlers')
/** Debounce interval for coalescing rapid subblock updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingSubblock = {
latest: { blockId: string; subblockId: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -18,44 +20,61 @@ type PendingSubblock = {
// Keyed by `${workflowId}:${blockId}:${subblockId}`
const pendingSubblockUpdates = new Map<string, PendingSubblock>()
export function setupSubblocksHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingSubblocksForSocket(socketId: string): void {
for (const [, pending] of pendingSubblockUpdates.entries()) {
// Remove this socket's operation entries
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
// If no more operations are waiting, the timeout will still fire and flush
// This is fine - the update will still persist, just no confirmation to send
}
}
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('subblock-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { blockId, subblockId, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
try {
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
// Server-side debounce/coalesce by workflowId+blockId+subblockId
const debouncedKey = `${workflowId}:${blockId}:${subblockId}`
const existing = pendingSubblockUpdates.get(debouncedKey)
@@ -66,7 +85,7 @@ export function setupSubblocksHandlers(
existing.timeout = setTimeout(async () => {
await flushSubblockUpdate(workflowId, existing, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}, 25)
}, DEBOUNCE_INTERVAL_MS)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -76,7 +95,7 @@ export function setupSubblocksHandlers(
await flushSubblockUpdate(workflowId, pending, roomManager)
pendingSubblockUpdates.delete(debouncedKey)
}
}, 25)
}, DEBOUNCE_INTERVAL_MS)
pendingSubblockUpdates.set(debouncedKey, {
latest: { blockId, subblockId, value, timestamp },
timeout,
@@ -88,7 +107,6 @@ export function setupSubblocksHandlers(
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
// Best-effort failure for the single operation if provided
if (operationId) {
socket.emit('operation-failed', {
operationId,
@@ -97,7 +115,6 @@ export function setupSubblocksHandlers(
})
}
// Also emit legacy operation-error for backward compatibility
socket.emit('operation-error', {
type: 'SUBBLOCK_UPDATE_FAILED',
message: `Failed to update subblock ${blockId}.${subblockId}: ${errorMessage}`,
@@ -111,9 +128,11 @@ export function setupSubblocksHandlers(
async function flushSubblockUpdate(
workflowId: string,
pending: PendingSubblock,
roomManager: RoomManager
roomManager: IRoomManager
) {
const { blockId, subblockId, value, timestamp } = pending.latest
const io = roomManager.io
try {
// Verify workflow still exists
const workflowExists = await db
@@ -124,14 +143,11 @@ async function flushSubblockUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
})
return
}
@@ -164,60 +180,48 @@ async function flushSubblockUpdate(
})
if (updateSuccessful) {
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
// Get all sockets in the room
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
// Only emit to sockets that didn't send any of the coalesced ops
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
}
})
}
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
} else {
io.to(workflowId).emit('subblock-update', {
blockId,
subblockId,
value,
timestamp,
})
}
// Confirm all coalesced operationIds
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
})
} else {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Block no longer exists',
retryable: false,
})
})
}
} catch (error) {
logger.error('Error flushing subblock update:', error)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
})
}
}

View File

@@ -2,12 +2,14 @@ import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { HandlerDependencies } from '@/socket/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import type { RoomManager } from '@/socket/rooms/manager'
import type { IRoomManager } from '@/socket/rooms'
const logger = createLogger('VariablesHandlers')
/** Debounce interval for coalescing rapid variable updates before persisting */
const DEBOUNCE_INTERVAL_MS = 25
type PendingVariable = {
latest: { variableId: string; field: string; value: any; timestamp: number }
timeout: NodeJS.Timeout
@@ -17,45 +19,58 @@ type PendingVariable = {
// Keyed by `${workflowId}:${variableId}:${field}`
const pendingVariableUpdates = new Map<string, PendingVariable>()
export function setupVariablesHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
/**
* Cleans up pending updates for a disconnected socket.
* Removes the socket's operationIds from pending updates to prevent memory leaks.
*/
export function cleanupPendingVariablesForSocket(socketId: string): void {
for (const [, pending] of pendingVariableUpdates.entries()) {
for (const [opId, sid] of pending.opToSocket.entries()) {
if (sid === socketId) {
pending.opToSocket.delete(opId)
}
}
}
}
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { variableId, field, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
try {
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'Session expired' })
}
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })
const debouncedKey = `${workflowId}:${variableId}:${field}`
const existing = pendingVariableUpdates.get(debouncedKey)
if (existing) {
@@ -65,7 +80,7 @@ export function setupVariablesHandlers(
existing.timeout = setTimeout(async () => {
await flushVariableUpdate(workflowId, existing, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}, 25)
}, DEBOUNCE_INTERVAL_MS)
} else {
const opToSocket = new Map<string, string>()
if (operationId) opToSocket.set(operationId, socket.id)
@@ -75,7 +90,7 @@ export function setupVariablesHandlers(
await flushVariableUpdate(workflowId, pending, roomManager)
pendingVariableUpdates.delete(debouncedKey)
}
}, 25)
}, DEBOUNCE_INTERVAL_MS)
pendingVariableUpdates.set(debouncedKey, {
latest: { variableId, field, value, timestamp },
timeout,
@@ -108,9 +123,11 @@ export function setupVariablesHandlers(
async function flushVariableUpdate(
workflowId: string,
pending: PendingVariable,
roomManager: RoomManager
roomManager: IRoomManager
) {
const { variableId, field, value, timestamp } = pending.latest
const io = roomManager.io
try {
const workflowExists = await db
.select({ id: workflow.id })
@@ -120,14 +137,11 @@ async function flushVariableUpdate(
if (workflowExists.length === 0) {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Workflow not found',
retryable: false,
})
})
return
}
@@ -163,59 +177,50 @@ async function flushVariableUpdate(
})
if (updateSuccessful) {
// Broadcast to other clients (exclude senders to avoid overwriting their local state)
const senderSocketIds = new Set(pending.opToSocket.values())
const io = (roomManager as any).io
if (io) {
const roomSockets = io.sockets.adapter.rooms.get(workflowId)
if (roomSockets) {
roomSockets.forEach((socketId: string) => {
if (!senderSocketIds.has(socketId)) {
const sock = io.sockets.sockets.get(socketId)
if (sock) {
sock.emit('variable-update', {
variableId,
field,
value,
timestamp,
})
}
}
})
}
// Broadcast to room excluding all senders (works cross-pod via Redis adapter)
const senderSocketIds = [...pending.opToSocket.values()]
if (senderSocketIds.length > 0) {
io.to(workflowId).except(senderSocketIds).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
} else {
io.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
})
}
// Confirm all coalesced operationIds (io.to(socketId) works cross-pod)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-confirmed', { operationId: opId, serverTimestamp: Date.now() })
}
io.to(socketId).emit('operation-confirmed', {
operationId: opId,
serverTimestamp: Date.now(),
})
})
logger.debug(`Flushed variable update ${workflowId}: ${variableId}.${field}`)
} else {
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: 'Variable no longer exists',
retryable: false,
})
})
}
} catch (error) {
logger.error('Error flushing variable update:', error)
pending.opToSocket.forEach((socketId, opId) => {
const sock = (roomManager as any).io?.sockets?.sockets?.get(socketId)
if (sock) {
sock.emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
}
io.to(socketId).emit('operation-failed', {
operationId: opId,
error: error instanceof Error ? error.message : 'Unknown error',
retryable: true,
})
})
}
}

View File

@@ -4,38 +4,12 @@ import { eq } from 'drizzle-orm'
import { getWorkflowState } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { verifyWorkflowAccess } from '@/socket/middleware/permissions'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket/rooms/manager'
import type { IRoomManager, UserPresence } from '@/socket/rooms'
const logger = createLogger('WorkflowHandlers')
export type { UserPresence, WorkflowRoom }
export interface HandlerDependencies {
roomManager: RoomManager
}
export const createWorkflowRoom = (workflowId: string): WorkflowRoom => ({
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
export const cleanupUserFromRoom = (
socketId: string,
workflowId: string,
roomManager: RoomManager
) => {
roomManager.cleanupUserFromRoom(socketId, workflowId)
}
export function setupWorkflowHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('join-workflow', async ({ workflowId }) => {
export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('join-workflow', async ({ workflowId, tabSessionId }) => {
try {
const userId = socket.userId
const userName = socket.userName
@@ -48,6 +22,7 @@ export function setupWorkflowHandlers(
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access
let userRole: string
try {
const accessInfo = await verifyWorkflowAccess(userId, workflowId)
@@ -63,23 +38,37 @@ export function setupWorkflowHandlers(
return
}
const currentWorkflowId = roomManager.getWorkflowIdForSocket(socket.id)
// Leave current room if in one
const currentWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
if (currentWorkflowId) {
socket.leave(currentWorkflowId)
roomManager.cleanupUserFromRoom(socket.id, currentWorkflowId)
roomManager.broadcastPresenceUpdate(currentWorkflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(currentWorkflowId)
}
const STALE_THRESHOLD_MS = 60_000
const now = Date.now()
const existingUsers = await roomManager.getWorkflowUsers(workflowId)
for (const existingUser of existingUsers) {
if (existingUser.userId === userId && existingUser.socketId !== socket.id) {
const isSameTab = tabSessionId && existingUser.tabSessionId === tabSessionId
const isStale =
now - (existingUser.lastActivity || existingUser.joinedAt || 0) > STALE_THRESHOLD_MS
if (isSameTab || isStale) {
logger.info(
`Cleaning up socket ${existingUser.socketId} for user ${userId} (${isSameTab ? 'same tab' : 'stale'})`
)
await roomManager.removeUserFromRoom(existingUser.socketId)
roomManager.io.in(existingUser.socketId).socketsLeave(workflowId)
}
}
}
// Join the new room
socket.join(workflowId)
if (!roomManager.hasWorkflowRoom(workflowId)) {
roomManager.setWorkflowRoom(workflowId, roomManager.createWorkflowRoom(workflowId))
}
const room = roomManager.getWorkflowRoom(workflowId)!
room.activeConnections++
// Get avatar URL
let avatarUrl = socket.userImage || null
if (!avatarUrl) {
try {
@@ -95,54 +84,68 @@ export function setupWorkflowHandlers(
}
}
// Create presence entry
const userPresence: UserPresence = {
userId,
workflowId,
userName,
socketId: socket.id,
tabSessionId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: userRole,
avatarUrl,
}
room.users.set(socket.id, userPresence)
roomManager.setWorkflowForSocket(socket.id, workflowId)
roomManager.setUserSession(socket.id, {
userId,
userName,
avatarUrl,
// Add user to room
await roomManager.addUserToRoom(workflowId, socket.id, userPresence)
// Get current presence list for the join acknowledgment
const presenceUsers = await roomManager.getWorkflowUsers(workflowId)
// Get workflow state
const workflowState = await getWorkflowState(workflowId)
// Send join success with presence list (client waits for this to confirm join)
socket.emit('join-workflow-success', {
workflowId,
socketId: socket.id,
presenceUsers,
})
const workflowState = await getWorkflowState(workflowId)
// Send workflow state
socket.emit('workflow-state', workflowState)
roomManager.broadcastPresenceUpdate(workflowId)
// Broadcast presence update to all users in the room
await roomManager.broadcastPresenceUpdate(workflowId)
const uniqueUserCount = roomManager.getUniqueUserCount(workflowId)
const uniqueUserCount = await roomManager.getUniqueUserCount(workflowId)
logger.info(
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users (${room.activeConnections} connections).`
`User ${userId} (${userName}) joined workflow ${workflowId}. Room now has ${uniqueUserCount} unique users.`
)
} catch (error) {
logger.error('Error joining workflow:', error)
socket.emit('error', {
type: 'JOIN_ERROR',
message: 'Failed to join workflow',
})
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
}
})
socket.on('leave-workflow', () => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
socket.on('leave-workflow', async () => {
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (workflowId && session) {
socket.leave(workflowId)
roomManager.cleanupUserFromRoom(socket.id, workflowId)
if (workflowId && session) {
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
await roomManager.broadcastPresenceUpdate(workflowId)
roomManager.broadcastPresenceUpdate(workflowId)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
logger.info(`User ${session.userId} (${session.userName}) left workflow ${workflowId}`)
}
} catch (error) {
logger.error('Error leaving workflow:', error)
}
})
}

View File

@@ -7,7 +7,7 @@ import { createServer, request as httpRequest } from 'http'
import { createMockLogger, databaseMock } from '@sim/testing'
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { createSocketIOServer } from '@/socket/config/socket'
import { RoomManager } from '@/socket/rooms/manager'
import { MemoryRoomManager } from '@/socket/rooms'
import { createHttpHandler } from '@/socket/routes/http'
vi.mock('@/lib/auth', () => ({
@@ -20,6 +20,30 @@ vi.mock('@/lib/auth', () => ({
vi.mock('@sim/db', () => databaseMock)
// Mock redis package to prevent actual Redis connections
vi.mock('redis', () => ({
createClient: vi.fn(() => ({
on: vi.fn(),
connect: vi.fn().mockResolvedValue(undefined),
quit: vi.fn().mockResolvedValue(undefined),
duplicate: vi.fn().mockReturnThis(),
})),
}))
// Mock env to not have REDIS_URL (use importOriginal to get helper functions)
vi.mock('@/lib/core/config/env', async (importOriginal) => {
const actual = await importOriginal<typeof import('@/lib/core/config/env')>()
return {
...actual,
env: {
...actual.env,
DATABASE_URL: 'postgres://localhost/test',
NODE_ENV: 'test',
REDIS_URL: undefined,
},
}
})
vi.mock('@/socket/middleware/auth', () => ({
authenticateSocket: vi.fn((socket, next) => {
socket.userId = 'test-user-id'
@@ -51,7 +75,7 @@ vi.mock('@/socket/database/operations', () => ({
describe('Socket Server Index Integration', () => {
let httpServer: any
let io: any
let roomManager: RoomManager
let roomManager: MemoryRoomManager
let logger: ReturnType<typeof createMockLogger>
let PORT: number
@@ -64,9 +88,10 @@ describe('Socket Server Index Integration', () => {
httpServer = createServer()
io = createSocketIOServer(httpServer)
io = await createSocketIOServer(httpServer)
roomManager = new RoomManager(io)
roomManager = new MemoryRoomManager(io)
await roomManager.initialize()
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
@@ -98,6 +123,9 @@ describe('Socket Server Index Integration', () => {
}, 20000)
afterEach(async () => {
if (roomManager) {
await roomManager.shutdown()
}
if (io) {
await new Promise<void>((resolve) => {
io.close(() => resolve())
@@ -177,43 +205,60 @@ describe('Socket Server Index Integration', () => {
})
describe('Room Manager Integration', () => {
it('should create room manager successfully', () => {
it('should create room manager successfully', async () => {
expect(roomManager).toBeDefined()
expect(roomManager.getTotalActiveConnections()).toBe(0)
expect(await roomManager.getTotalActiveConnections()).toBe(0)
})
it('should create workflow rooms', () => {
it('should add and get users from workflow rooms', async () => {
const workflowId = 'test-workflow-123'
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
const socketId = 'test-socket-123'
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const retrievedRoom = roomManager.getWorkflowRoom(workflowId)
expect(retrievedRoom).toBeDefined()
expect(retrievedRoom?.workflowId).toBe(workflowId)
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
const users = await roomManager.getWorkflowUsers(workflowId)
expect(users).toHaveLength(1)
expect(users[0].socketId).toBe(socketId)
})
it('should manage user sessions', () => {
it('should manage user sessions', async () => {
const socketId = 'test-socket-123'
const workflowId = 'test-workflow-456'
const session = { userId: 'user-123', userName: 'Test User' }
roomManager.setWorkflowForSocket(socketId, workflowId)
roomManager.setUserSession(socketId, session)
const presence = {
userId: 'user-123',
workflowId,
userName: 'Test User',
socketId,
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
}
expect(roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
expect(roomManager.getUserSession(socketId)).toEqual(session)
await roomManager.addUserToRoom(workflowId, socketId, presence)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBe(workflowId)
const session = await roomManager.getUserSession(socketId)
expect(session).toBeDefined()
expect(session?.userId).toBe('user-123')
})
it('should clean up rooms properly', () => {
it('should clean up rooms properly', async () => {
const workflowId = 'test-workflow-789'
const socketId = 'test-socket-789'
const room = roomManager.createWorkflowRoom(workflowId)
roomManager.setWorkflowRoom(workflowId, room)
// Add user to room
room.users.set(socketId, {
const presence = {
userId: 'user-789',
workflowId,
userName: 'Test User',
@@ -221,16 +266,18 @@ describe('Socket Server Index Integration', () => {
joinedAt: Date.now(),
lastActivity: Date.now(),
role: 'admin',
})
room.activeConnections = 1
}
roomManager.setWorkflowForSocket(socketId, workflowId)
await roomManager.addUserToRoom(workflowId, socketId, presence)
// Clean up user
roomManager.cleanupUserFromRoom(socketId, workflowId)
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(true)
expect(roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(roomManager.getWorkflowIdForSocket(socketId)).toBeUndefined()
// Remove user
await roomManager.removeUserFromRoom(socketId)
// Room should be cleaned up since it's now empty
expect(await roomManager.hasWorkflowRoom(workflowId)).toBe(false)
expect(await roomManager.getWorkflowIdForSocket(socketId)).toBeNull()
})
})
@@ -238,7 +285,7 @@ describe('Socket Server Index Integration', () => {
it.concurrent('should properly import all extracted modules', async () => {
const { createSocketIOServer } = await import('@/socket/config/socket')
const { createHttpHandler } = await import('@/socket/routes/http')
const { RoomManager } = await import('@/socket/rooms/manager')
const { MemoryRoomManager, RedisRoomManager } = await import('@/socket/rooms')
const { authenticateSocket } = await import('@/socket/middleware/auth')
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const { getWorkflowState } = await import('@/socket/database/operations')
@@ -246,22 +293,23 @@ describe('Socket Server Index Integration', () => {
expect(createSocketIOServer).toBeTypeOf('function')
expect(createHttpHandler).toBeTypeOf('function')
expect(RoomManager).toBeTypeOf('function')
expect(MemoryRoomManager).toBeTypeOf('function')
expect(RedisRoomManager).toBeTypeOf('function')
expect(authenticateSocket).toBeTypeOf('function')
expect(verifyWorkflowAccess).toBeTypeOf('function')
expect(getWorkflowState).toBeTypeOf('function')
expect(WorkflowOperationSchema).toBeDefined()
})
it.concurrent('should maintain all original functionality after refactoring', () => {
it.concurrent('should maintain all original functionality after refactoring', async () => {
expect(httpServer).toBeDefined()
expect(io).toBeDefined()
expect(roomManager).toBeDefined()
expect(typeof roomManager.createWorkflowRoom).toBe('function')
expect(typeof roomManager.cleanupUserFromRoom).toBe('function')
expect(typeof roomManager.addUserToRoom).toBe('function')
expect(typeof roomManager.removeUserFromRoom).toBe('function')
expect(typeof roomManager.handleWorkflowDeletion).toBe('function')
expect(typeof roomManager.validateWorkflowConsistency).toBe('function')
expect(typeof roomManager.broadcastPresenceUpdate).toBe('function')
})
})
@@ -286,6 +334,7 @@ describe('Socket Server Index Integration', () => {
it('should have shutdown capability', () => {
expect(typeof httpServer.close).toBe('function')
expect(typeof io.close).toBe('function')
expect(typeof roomManager.shutdown).toBe('function')
})
})

View File

@@ -1,112 +1,125 @@
import { createServer } from 'http'
import { createLogger } from '@sim/logger'
import type { Server as SocketIOServer } from 'socket.io'
import { env } from '@/lib/core/config/env'
import { createSocketIOServer } from '@/socket/config/socket'
import { createSocketIOServer, shutdownSocketIOAdapter } from '@/socket/config/socket'
import { setupAllHandlers } from '@/socket/handlers'
import { type AuthenticatedSocket, authenticateSocket } from '@/socket/middleware/auth'
import { RoomManager } from '@/socket/rooms/manager'
import { type IRoomManager, MemoryRoomManager, RedisRoomManager } from '@/socket/rooms'
import { createHttpHandler } from '@/socket/routes/http'
const logger = createLogger('CollaborativeSocketServer')
// Enhanced server configuration - HTTP server will be configured with handler after all dependencies are set up
const httpServer = createServer()
/** Maximum time to wait for graceful shutdown before forcing exit */
const SHUTDOWN_TIMEOUT_MS = 10000
const io = createSocketIOServer(httpServer)
async function createRoomManager(io: SocketIOServer): Promise<IRoomManager> {
if (env.REDIS_URL) {
logger.info('Initializing Redis-backed RoomManager for multi-pod support')
const manager = new RedisRoomManager(io, env.REDIS_URL)
await manager.initialize()
return manager
}
// Initialize room manager after io is created
const roomManager = new RoomManager(io)
logger.warn('No REDIS_URL configured - using in-memory RoomManager (single-pod only)')
const manager = new MemoryRoomManager(io)
await manager.initialize()
return manager
}
io.use(authenticateSocket)
async function main() {
const httpServer = createServer()
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
// Don't exit in production, just log
})
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
})
httpServer.on('error', (error) => {
logger.error('HTTP server error:', error)
})
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
hasRedis: !!env.REDIS_URL,
})
})
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
// Create Socket.IO server with Redis adapter if configured
const io = await createSocketIOServer(httpServer)
setupAllHandlers(socket, roomManager)
})
// Initialize room manager (Redis or in-memory based on config)
const roomManager = await createRoomManager(io)
httpServer.on('request', (req, res) => {
logger.info(`🌐 HTTP Request: ${req.method} ${req.url}`, {
method: req.method,
url: req.url,
userAgent: req.headers['user-agent'],
origin: req.headers.origin,
host: req.headers.host,
timestamp: new Date().toISOString(),
// Set up authentication middleware
io.use(authenticateSocket)
// Set up HTTP handler for health checks and internal APIs
const httpHandler = createHttpHandler(roomManager, logger)
httpServer.on('request', httpHandler)
// Global error handlers
process.on('uncaughtException', (error) => {
logger.error('Uncaught Exception:', error)
})
})
io.engine.on('connection_error', (err) => {
logger.error('❌ Engine.IO Connection error:', {
code: err.code,
message: err.message,
context: err.context,
req: err.req
? {
url: err.req.url,
method: err.req.method,
headers: err.req.headers,
}
: 'No request object',
process.on('unhandledRejection', (reason, promise) => {
logger.error('Unhandled Rejection at:', promise, 'reason:', reason)
})
})
const PORT = Number(env.PORT || env.SOCKET_PORT || 3002)
httpServer.on('error', (error: NodeJS.ErrnoException) => {
logger.error('HTTP server error:', error)
if (error.code === 'EADDRINUSE' || error.code === 'EACCES') {
process.exit(1)
}
})
logger.info('Starting Socket.IO server...', {
port: PORT,
nodeEnv: env.NODE_ENV,
hasDatabase: !!env.DATABASE_URL,
hasAuth: !!env.BETTER_AUTH_SECRET,
})
io.engine.on('connection_error', (err) => {
logger.error('Socket.IO connection error:', {
req: err.req?.url,
code: err.code,
message: err.message,
context: err.context,
})
})
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`🏥 Health check available at: http://localhost:${PORT}/health`)
})
io.on('connection', (socket: AuthenticatedSocket) => {
logger.info(`New socket connection: ${socket.id}`)
setupAllHandlers(socket, roomManager)
})
httpServer.on('error', (error) => {
logger.error('❌ Server failed to start:', error)
httpServer.listen(PORT, '0.0.0.0', () => {
logger.info(`Socket.IO server running on port ${PORT}`)
logger.info(`Health check available at: http://localhost:${PORT}/health`)
})
const shutdown = async () => {
logger.info('Shutting down Socket.IO server...')
try {
await roomManager.shutdown()
logger.info('RoomManager shutdown complete')
} catch (error) {
logger.error('Error during RoomManager shutdown:', error)
}
try {
await shutdownSocketIOAdapter()
} catch (error) {
logger.error('Error during Socket.IO adapter shutdown:', error)
}
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
setTimeout(() => {
logger.error('Forced shutdown after timeout')
process.exit(1)
}, SHUTDOWN_TIMEOUT_MS)
}
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)
}
// Start the server
main().catch((error) => {
logger.error('Failed to start server:', error)
process.exit(1)
})
process.on('SIGINT', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})
process.on('SIGTERM', () => {
logger.info('Shutting down Socket.IO server...')
httpServer.close(() => {
logger.info('Socket.IO server closed')
process.exit(0)
})
})

View File

@@ -21,7 +21,7 @@ export interface AuthenticatedSocket extends Socket {
* Socket.IO authentication middleware.
* Handles both anonymous mode (DISABLE_AUTH=true) and normal token-based auth.
*/
export async function authenticateSocket(socket: AuthenticatedSocket, next: any) {
export async function authenticateSocket(socket: AuthenticatedSocket, next: (err?: Error) => void) {
try {
if (isAuthDisabled) {
socket.userId = ANONYMOUS_USER_ID

View File

@@ -73,7 +73,7 @@ export function checkRolePermission(
return { allowed: true }
}
export async function verifyWorkspaceMembership(
async function verifyWorkspaceMembership(
userId: string,
workspaceId: string
): Promise<string | null> {

View File

@@ -0,0 +1,3 @@
export { MemoryRoomManager } from '@/socket/rooms/memory-manager'
export { RedisRoomManager } from '@/socket/rooms/redis-manager'
export type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'

View File

@@ -1,291 +0,0 @@
import * as schema from '@sim/db/schema'
import { workflowBlocks, workflowEdges } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { drizzle } from 'drizzle-orm/postgres-js'
import postgres from 'postgres'
import type { Server } from 'socket.io'
import { env } from '@/lib/core/config/env'
const connectionString = env.DATABASE_URL
const db = drizzle(
postgres(connectionString, {
prepare: false,
idle_timeout: 15,
connect_timeout: 20,
max: 3,
onnotice: () => {},
}),
{ schema }
)
const logger = createLogger('RoomManager')
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence> // socketId -> UserPresence
lastModified: number
activeConnections: number
}
export class RoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<
string,
{ userId: string; userName: string; avatarUrl?: string | null }
>()
private io: Server
constructor(io: Server) {
this.io = io
}
createWorkflowRoom(workflowId: string): WorkflowRoom {
return {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
}
}
cleanupUserFromRoom(socketId: string, workflowId: string) {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
}
handleWorkflowDeletion(workflowId: string) {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
socketsToDisconnect.forEach((socketId) => {
const socket = this.io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
this.cleanupUserFromRoom(socketId, workflowId)
})
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
handleWorkflowRevert(workflowId: string, timestamp: number) {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
handleWorkflowUpdate(workflowId: string) {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
// Notify all clients in the workflow room that the workflow has been updated
// This will trigger them to refresh their local state
this.io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
handleCopilotWorkflowEdit(workflowId: string, description?: string) {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
// Emit special event for copilot edits that tells clients to rehydrate from database
this.io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
async validateWorkflowConsistency(
workflowId: string
): Promise<{ valid: boolean; issues: string[] }> {
try {
const issues: string[] = []
const orphanedEdges = await db
.select({
id: workflowEdges.id,
sourceBlockId: workflowEdges.sourceBlockId,
targetBlockId: workflowEdges.targetBlockId,
})
.from(workflowEdges)
.leftJoin(workflowBlocks, eq(workflowEdges.sourceBlockId, workflowBlocks.id))
.where(and(eq(workflowEdges.workflowId, workflowId), isNull(workflowBlocks.id)))
if (orphanedEdges.length > 0) {
issues.push(`Found ${orphanedEdges.length} orphaned edges with missing source blocks`)
}
return { valid: issues.length === 0, issues }
} catch (error) {
logger.error('Error validating workflow consistency:', error)
return { valid: false, issues: ['Consistency check failed'] }
}
}
getWorkflowRooms(): ReadonlyMap<string, WorkflowRoom> {
return this.workflowRooms
}
getSocketToWorkflow(): ReadonlyMap<string, string> {
return this.socketToWorkflow
}
getUserSessions(): ReadonlyMap<string, { userId: string; userName: string }> {
return this.userSessions
}
hasWorkflowRoom(workflowId: string): boolean {
return this.workflowRooms.has(workflowId)
}
getWorkflowRoom(workflowId: string): WorkflowRoom | undefined {
return this.workflowRooms.get(workflowId)
}
setWorkflowRoom(workflowId: string, room: WorkflowRoom): void {
this.workflowRooms.set(workflowId, room)
}
getWorkflowIdForSocket(socketId: string): string | undefined {
return this.socketToWorkflow.get(socketId)
}
setWorkflowForSocket(socketId: string, workflowId: string): void {
this.socketToWorkflow.set(socketId, workflowId)
}
getUserSession(
socketId: string
): { userId: string; userName: string; avatarUrl?: string | null } | undefined {
return this.userSessions.get(socketId)
}
setUserSession(
socketId: string,
session: { userId: string; userName: string; avatarUrl?: string | null }
): void {
this.userSessions.set(socketId, session)
}
getTotalActiveConnections(): number {
return Array.from(this.workflowRooms.values()).reduce(
(total, room) => total + room.activeConnections,
0
)
}
broadcastPresenceUpdate(workflowId: string): void {
const room = this.workflowRooms.get(workflowId)
if (room) {
const roomPresence = Array.from(room.users.values())
this.io.to(workflowId).emit('presence-update', roomPresence)
}
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this.io.to(workflowId).emit(event, payload)
}
/**
* Get the number of unique users in a workflow room
* (not the number of socket connections)
*/
getUniqueUserCount(workflowId: string): number {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
}

View File

@@ -0,0 +1,260 @@
import { createLogger } from '@sim/logger'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession, WorkflowRoom } from '@/socket/rooms/types'
const logger = createLogger('MemoryRoomManager')
/**
* In-memory room manager for single-pod deployments
* Used as fallback when REDIS_URL is not configured
*/
export class MemoryRoomManager implements IRoomManager {
private workflowRooms = new Map<string, WorkflowRoom>()
private socketToWorkflow = new Map<string, string>()
private userSessions = new Map<string, UserSession>()
private _io: Server
constructor(io: Server) {
this._io = io
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}
async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()
this.userSessions.clear()
logger.info('MemoryRoomManager shutdown complete')
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
// Create room if it doesn't exist
if (!this.workflowRooms.has(workflowId)) {
this.workflowRooms.set(workflowId, {
workflowId,
users: new Map(),
lastModified: Date.now(),
activeConnections: 0,
})
}
const room = this.workflowRooms.get(workflowId)!
room.users.set(socketId, presence)
room.activeConnections++
room.lastModified = Date.now()
// Map socket to workflow
this.socketToWorkflow.set(socketId, workflowId)
// Store session
this.userSessions.set(socketId, {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl,
})
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
}
async removeUserFromRoom(socketId: string): Promise<string | null> {
const workflowId = this.socketToWorkflow.get(socketId)
if (!workflowId) {
return null
}
const room = this.workflowRooms.get(workflowId)
if (room) {
room.users.delete(socketId)
room.activeConnections = Math.max(0, room.activeConnections - 1)
// Clean up empty rooms
if (room.activeConnections === 0) {
this.workflowRooms.delete(workflowId)
logger.info(`Cleaned up empty workflow room: ${workflowId}`)
}
}
this.socketToWorkflow.delete(socketId)
this.userSessions.delete(socketId)
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
return workflowId
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.socketToWorkflow.get(socketId) ?? null
}
async getUserSession(socketId: string): Promise<UserSession | null> {
return this.userSessions.get(socketId) ?? null
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
const room = this.workflowRooms.get(workflowId)
if (!room) return []
return Array.from(room.users.values())
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
return this.workflowRooms.has(workflowId)
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (!room) return
const presence = room.users.get(socketId)
if (presence) {
if (updates.cursor !== undefined) presence.cursor = updates.cursor
if (updates.selection !== undefined) presence.selection = updates.selection
presence.lastActivity = updates.lastActivity ?? Date.now()
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
const room = this.workflowRooms.get(workflowId)
if (room) {
room.lastModified = Date.now()
}
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const room = this.workflowRooms.get(workflowId)
if (!room) return 0
const uniqueUsers = new Set<string>()
room.users.forEach((presence) => {
uniqueUsers.add(presence.userId)
})
return uniqueUsers.size
}
async getTotalActiveConnections(): Promise<number> {
let total = 0
for (const room of this.workflowRooms.values()) {
total += room.activeConnections
}
return total
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for deleted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
const socketsToDisconnect: string[] = []
room.users.forEach((_presence, socketId) => {
socketsToDisconnect.push(socketId)
})
for (const socketId of socketsToDisconnect) {
const socket = this._io.sockets.sockets.get(socketId)
if (socket) {
socket.leave(workflowId)
logger.debug(`Disconnected socket ${socketId} from deleted workflow ${workflowId}`)
}
await this.removeUserFromRoom(socketId)
}
this.workflowRooms.delete(workflowId)
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${socketsToDisconnect.length} users disconnected)`
)
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -0,0 +1,434 @@
import { createLogger } from '@sim/logger'
import { createClient, type RedisClientType } from 'redis'
import type { Server } from 'socket.io'
import type { IRoomManager, UserPresence, UserSession } from '@/socket/rooms/types'
const logger = createLogger('RedisRoomManager')
const KEYS = {
workflowUsers: (wfId: string) => `workflow:${wfId}:users`,
workflowMeta: (wfId: string) => `workflow:${wfId}:meta`,
socketWorkflow: (socketId: string) => `socket:${socketId}:workflow`,
socketSession: (socketId: string) => `socket:${socketId}:session`,
} as const
const SOCKET_KEY_TTL = 3600
/**
* Lua script for atomic user removal from room.
* Returns workflowId if user was removed, null otherwise.
* Handles room cleanup atomically to prevent race conditions.
*/
const REMOVE_USER_SCRIPT = `
local socketWorkflowKey = KEYS[1]
local socketSessionKey = KEYS[2]
local workflowUsersPrefix = ARGV[1]
local workflowMetaPrefix = ARGV[2]
local socketId = ARGV[3]
local workflowId = redis.call('GET', socketWorkflowKey)
if not workflowId then
return nil
end
local workflowUsersKey = workflowUsersPrefix .. workflowId .. ':users'
local workflowMetaKey = workflowMetaPrefix .. workflowId .. ':meta'
redis.call('HDEL', workflowUsersKey, socketId)
redis.call('DEL', socketWorkflowKey, socketSessionKey)
local remaining = redis.call('HLEN', workflowUsersKey)
if remaining == 0 then
redis.call('DEL', workflowUsersKey, workflowMetaKey)
end
return workflowId
`
/**
* Lua script for atomic user activity update.
* Performs read-modify-write atomically to prevent lost updates.
* Also refreshes TTL on socket keys to prevent expiry during long sessions.
*/
const UPDATE_ACTIVITY_SCRIPT = `
local workflowUsersKey = KEYS[1]
local socketWorkflowKey = KEYS[2]
local socketSessionKey = KEYS[3]
local socketId = ARGV[1]
local cursorJson = ARGV[2]
local selectionJson = ARGV[3]
local lastActivity = ARGV[4]
local ttl = tonumber(ARGV[5])
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
if not existingJson then
return 0
end
local existing = cjson.decode(existingJson)
if cursorJson ~= '' then
existing.cursor = cjson.decode(cursorJson)
end
if selectionJson ~= '' then
existing.selection = cjson.decode(selectionJson)
end
existing.lastActivity = tonumber(lastActivity)
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
redis.call('EXPIRE', socketWorkflowKey, ttl)
redis.call('EXPIRE', socketSessionKey, ttl)
return 1
`
/**
* Redis-backed room manager for multi-pod deployments.
* Uses Lua scripts for atomic operations to prevent race conditions.
*/
export class RedisRoomManager implements IRoomManager {
private redis: RedisClientType
private _io: Server
private isConnected = false
private removeUserScriptSha: string | null = null
private updateActivityScriptSha: string | null = null
constructor(io: Server, redisUrl: string) {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})
this.redis.on('error', (err) => {
logger.error('Redis client error:', err)
})
this.redis.on('reconnecting', () => {
logger.warn('Redis client reconnecting...')
this.isConnected = false
})
this.redis.on('ready', () => {
logger.info('Redis client ready')
this.isConnected = true
})
}
get io(): Server {
return this._io
}
async initialize(): Promise<void> {
if (this.isConnected) return
try {
await this.redis.connect()
this.isConnected = true
// Pre-load Lua scripts for better performance
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
logger.info('RedisRoomManager connected to Redis and scripts loaded')
} catch (error) {
logger.error('Failed to connect to Redis:', error)
throw error
}
}
async shutdown(): Promise<void> {
if (!this.isConnected) return
try {
await this.redis.quit()
this.isConnected = false
logger.info('RedisRoomManager disconnected from Redis')
} catch (error) {
logger.error('Error during Redis shutdown:', error)
}
}
async addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void> {
try {
const pipeline = this.redis.multi()
pipeline.hSet(KEYS.workflowUsers(workflowId), socketId, JSON.stringify(presence))
pipeline.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
pipeline.set(KEYS.socketWorkflow(socketId), workflowId)
pipeline.expire(KEYS.socketWorkflow(socketId), SOCKET_KEY_TTL)
pipeline.hSet(KEYS.socketSession(socketId), {
userId: presence.userId,
userName: presence.userName,
avatarUrl: presence.avatarUrl || '',
})
pipeline.expire(KEYS.socketSession(socketId), SOCKET_KEY_TTL)
const results = await pipeline.exec()
// Check if any command failed
const failed = results.some((result) => result instanceof Error)
if (failed) {
logger.error(`Pipeline partially failed when adding user to room`, { workflowId, socketId })
throw new Error('Failed to store user session data in Redis')
}
logger.debug(`Added user ${presence.userId} to workflow ${workflowId} (socket: ${socketId})`)
} catch (error) {
logger.error(`Failed to add user to room: ${socketId} -> ${workflowId}`, error)
throw error
}
}
async removeUserFromRoom(socketId: string, retried = false): Promise<string | null> {
if (!this.removeUserScriptSha) {
logger.error('removeUserFromRoom called before initialize()')
return null
}
try {
const workflowId = await this.redis.evalSha(this.removeUserScriptSha, {
keys: [KEYS.socketWorkflow(socketId), KEYS.socketSession(socketId)],
arguments: ['workflow:', 'workflow:', socketId],
})
if (workflowId) {
logger.debug(`Removed socket ${socketId} from workflow ${workflowId}`)
}
return workflowId as string | null
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.removeUserScriptSha = await this.redis.scriptLoad(REMOVE_USER_SCRIPT)
return this.removeUserFromRoom(socketId, true)
}
logger.error(`Failed to remove user from room: ${socketId}`, error)
return null
}
}
async getWorkflowIdForSocket(socketId: string): Promise<string | null> {
return this.redis.get(KEYS.socketWorkflow(socketId))
}
async getUserSession(socketId: string): Promise<UserSession | null> {
try {
const session = await this.redis.hGetAll(KEYS.socketSession(socketId))
if (!session.userId) {
return null
}
return {
userId: session.userId,
userName: session.userName,
avatarUrl: session.avatarUrl || undefined,
}
} catch (error) {
logger.error(`Failed to get user session for ${socketId}:`, error)
return null
}
}
async getWorkflowUsers(workflowId: string): Promise<UserPresence[]> {
try {
const users = await this.redis.hGetAll(KEYS.workflowUsers(workflowId))
return Object.entries(users)
.map(([socketId, json]) => {
try {
return JSON.parse(json) as UserPresence
} catch {
logger.warn(`Corrupted user data for socket ${socketId}, skipping`)
return null
}
})
.filter((u): u is UserPresence => u !== null)
} catch (error) {
logger.error(`Failed to get workflow users for ${workflowId}:`, error)
return []
}
}
async hasWorkflowRoom(workflowId: string): Promise<boolean> {
const exists = await this.redis.exists(KEYS.workflowUsers(workflowId))
return exists > 0
}
async updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>,
retried = false
): Promise<void> {
if (!this.updateActivityScriptSha) {
logger.error('updateUserActivity called before initialize()')
return
}
try {
await this.redis.evalSha(this.updateActivityScriptSha, {
keys: [
KEYS.workflowUsers(workflowId),
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
],
arguments: [
socketId,
updates.cursor !== undefined ? JSON.stringify(updates.cursor) : '',
updates.selection !== undefined ? JSON.stringify(updates.selection) : '',
(updates.lastActivity ?? Date.now()).toString(),
SOCKET_KEY_TTL.toString(),
],
})
} catch (error) {
if ((error as Error).message?.includes('NOSCRIPT') && !retried) {
logger.warn('Lua script not found, reloading...')
this.updateActivityScriptSha = await this.redis.scriptLoad(UPDATE_ACTIVITY_SCRIPT)
return this.updateUserActivity(workflowId, socketId, updates, true)
}
logger.error(`Failed to update user activity: ${socketId}`, error)
}
}
async updateRoomLastModified(workflowId: string): Promise<void> {
await this.redis.hSet(KEYS.workflowMeta(workflowId), 'lastModified', Date.now().toString())
}
async broadcastPresenceUpdate(workflowId: string): Promise<void> {
const users = await this.getWorkflowUsers(workflowId)
// io.to() with Redis adapter broadcasts to all pods
this._io.to(workflowId).emit('presence-update', users)
}
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void {
this._io.to(workflowId).emit(event, payload)
}
async getUniqueUserCount(workflowId: string): Promise<number> {
const users = await this.getWorkflowUsers(workflowId)
const uniqueUserIds = new Set(users.map((u) => u.userId))
return uniqueUserIds.size
}
async getTotalActiveConnections(): Promise<number> {
// This is more complex with Redis - we'd need to scan all workflow:*:users keys
// For now, just count sockets in this server instance
// The true count would require aggregating across all pods
return this._io.sockets.sockets.size
}
async handleWorkflowDeletion(workflowId: string): Promise<void> {
logger.info(`Handling workflow deletion notification for ${workflowId}`)
try {
const users = await this.getWorkflowUsers(workflowId)
if (users.length === 0) {
logger.debug(`No active users found for deleted workflow ${workflowId}`)
return
}
// Notify all clients across all pods via Redis adapter
this._io.to(workflowId).emit('workflow-deleted', {
workflowId,
message: 'This workflow has been deleted',
timestamp: Date.now(),
})
// Use Socket.IO's cross-pod socketsLeave() to remove all sockets from the room
// This works across all pods when using the Redis adapter
await this._io.in(workflowId).socketsLeave(workflowId)
logger.debug(`All sockets left workflow room ${workflowId} via socketsLeave()`)
// Remove all users from Redis state
for (const user of users) {
await this.removeUserFromRoom(user.socketId)
}
// Clean up room data
await this.redis.del([KEYS.workflowUsers(workflowId), KEYS.workflowMeta(workflowId)])
logger.info(
`Cleaned up workflow room ${workflowId} after deletion (${users.length} users disconnected)`
)
} catch (error) {
logger.error(`Failed to handle workflow deletion for ${workflowId}:`, error)
}
}
async handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void> {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this._io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow revert: ${workflowId}`)
}
async handleWorkflowUpdate(workflowId: string): Promise<void> {
logger.info(`Handling workflow update notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for updated workflow ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('workflow-updated', {
workflowId,
message: 'Workflow has been updated externally',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about workflow update: ${workflowId}`)
}
async handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void> {
logger.info(`Handling copilot workflow edit notification for ${workflowId}`)
const hasRoom = await this.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`No active room found for copilot workflow edit ${workflowId}`)
return
}
const timestamp = Date.now()
this._io.to(workflowId).emit('copilot-workflow-edit', {
workflowId,
description,
message: 'Copilot has edited the workflow - rehydrating from database',
timestamp,
})
await this.updateRoomLastModified(workflowId)
const userCount = await this.getUniqueUserCount(workflowId)
logger.info(`Notified ${userCount} users about copilot workflow edit: ${workflowId}`)
}
}

View File

@@ -0,0 +1,140 @@
import type { Server } from 'socket.io'
/**
* User presence data stored in room state
*/
export interface UserPresence {
userId: string
workflowId: string
userName: string
socketId: string
tabSessionId?: string
joinedAt: number
lastActivity: number
role: string
cursor?: { x: number; y: number }
selection?: { type: 'block' | 'edge' | 'none'; id?: string }
avatarUrl?: string | null
}
/**
* User session data (minimal info for quick lookups)
*/
export interface UserSession {
userId: string
userName: string
avatarUrl?: string | null
}
/**
* Workflow room state
*/
export interface WorkflowRoom {
workflowId: string
users: Map<string, UserPresence>
lastModified: number
activeConnections: number
}
/**
* Common interface for room managers (in-memory and Redis)
* All methods that access state are async to support Redis operations
*/
export interface IRoomManager {
readonly io: Server
/**
* Initialize the room manager (connect to Redis, etc.)
*/
initialize(): Promise<void>
/**
* Clean shutdown
*/
shutdown(): Promise<void>
/**
* Add a user to a workflow room
*/
addUserToRoom(workflowId: string, socketId: string, presence: UserPresence): Promise<void>
/**
* Remove a user from their current room
* Returns the workflowId they were in, or null if not in any room
*/
removeUserFromRoom(socketId: string): Promise<string | null>
/**
* Get the workflow ID for a socket
*/
getWorkflowIdForSocket(socketId: string): Promise<string | null>
/**
* Get user session data for a socket
*/
getUserSession(socketId: string): Promise<UserSession | null>
/**
* Get all users in a workflow room
*/
getWorkflowUsers(workflowId: string): Promise<UserPresence[]>
/**
* Check if a workflow room exists
*/
hasWorkflowRoom(workflowId: string): Promise<boolean>
/**
* Update user activity (cursor, selection, lastActivity)
*/
updateUserActivity(
workflowId: string,
socketId: string,
updates: Partial<Pick<UserPresence, 'cursor' | 'selection' | 'lastActivity'>>
): Promise<void>
/**
* Update room's lastModified timestamp
*/
updateRoomLastModified(workflowId: string): Promise<void>
/**
* Broadcast presence update to all clients in a workflow room
*/
broadcastPresenceUpdate(workflowId: string): Promise<void>
/**
* Emit an event to all clients in a workflow room
*/
emitToWorkflow<T = unknown>(workflowId: string, event: string, payload: T): void
/**
* Get the number of unique users in a workflow room
*/
getUniqueUserCount(workflowId: string): Promise<number>
/**
* Get total active connections across all rooms
*/
getTotalActiveConnections(): Promise<number>
/**
* Handle workflow deletion - notify users and clean up room
*/
handleWorkflowDeletion(workflowId: string): Promise<void>
/**
* Handle workflow revert - notify users
*/
handleWorkflowRevert(workflowId: string, timestamp: number): Promise<void>
/**
* Handle workflow update - notify users
*/
handleWorkflowUpdate(workflowId: string): Promise<void>
/**
* Handle copilot workflow edit - notify users to rehydrate
*/
handleCopilotWorkflowEdit(workflowId: string, description?: string): Promise<void>
}

View File

@@ -1,11 +1,52 @@
import type { IncomingMessage, ServerResponse } from 'http'
import type { RoomManager } from '@/socket/rooms/manager'
import { env } from '@/lib/core/config/env'
import type { IRoomManager } from '@/socket/rooms'
interface Logger {
info: (message: string, ...args: any[]) => void
error: (message: string, ...args: any[]) => void
debug: (message: string, ...args: any[]) => void
warn: (message: string, ...args: any[]) => void
info: (message: string, ...args: unknown[]) => void
error: (message: string, ...args: unknown[]) => void
debug: (message: string, ...args: unknown[]) => void
warn: (message: string, ...args: unknown[]) => void
}
function checkInternalApiKey(req: IncomingMessage): { success: boolean; error?: string } {
const apiKey = req.headers['x-api-key']
const expectedApiKey = env.INTERNAL_API_SECRET
if (!expectedApiKey) {
return { success: false, error: 'Internal API key not configured' }
}
if (!apiKey) {
return { success: false, error: 'API key required' }
}
if (apiKey !== expectedApiKey) {
return { success: false, error: 'Invalid API key' }
}
return { success: true }
}
function readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => resolve(body))
req.on('error', reject)
})
}
function sendSuccess(res: ServerResponse): void {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
}
function sendError(res: ServerResponse, message: string, status = 500): void {
res.writeHead(status, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: message }))
}
/**
@@ -14,101 +55,91 @@ interface Logger {
* @param logger - Logger instance for logging requests and errors
* @returns HTTP request handler function
*/
export function createHttpHandler(roomManager: RoomManager, logger: Logger) {
return (req: IncomingMessage, res: ServerResponse) => {
export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
return async (req: IncomingMessage, res: ServerResponse) => {
// Health check doesn't require auth
if (req.method === 'GET' && req.url === '/health') {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections: roomManager.getTotalActiveConnections(),
})
)
try {
const connections = await roomManager.getTotalActiveConnections()
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(
JSON.stringify({
status: 'ok',
timestamp: new Date().toISOString(),
connections,
})
)
} catch (error) {
logger.error('Error in health check:', error)
res.writeHead(503, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ status: 'error', message: 'Health check failed' }))
}
return
}
// All POST endpoints require internal API key authentication
if (req.method === 'POST') {
const authResult = checkInternalApiKey(req)
if (!authResult.success) {
res.writeHead(401, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: authResult.error }))
return
}
}
// Handle workflow deletion notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-deleted') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowDeletion(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process deletion notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowDeletion(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow deletion notification:', error)
sendError(res, 'Failed to process deletion notification')
}
return
}
// Handle workflow update notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-updated') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId } = JSON.parse(body)
roomManager.handleWorkflowUpdate(workflowId)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow update notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process update notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId } = JSON.parse(body)
await roomManager.handleWorkflowUpdate(workflowId)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow update notification:', error)
sendError(res, 'Failed to process update notification')
}
return
}
// Handle copilot workflow edit notifications from the main API
if (req.method === 'POST' && req.url === '/api/copilot-workflow-edit') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, description } = JSON.parse(body)
roomManager.handleCopilotWorkflowEdit(workflowId, description)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process copilot edit notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId, description } = JSON.parse(body)
await roomManager.handleCopilotWorkflowEdit(workflowId, description)
sendSuccess(res)
} catch (error) {
logger.error('Error handling copilot workflow edit notification:', error)
sendError(res, 'Failed to process copilot edit notification')
}
return
}
// Handle workflow revert notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, timestamp } = JSON.parse(body)
roomManager.handleWorkflowRevert(workflowId, timestamp)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process revert notification' }))
}
})
try {
const body = await readRequestBody(req)
const { workflowId, timestamp } = JSON.parse(body)
await roomManager.handleWorkflowRevert(workflowId, timestamp)
sendSuccess(res)
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
sendError(res, 'Failed to process revert notification')
}
return
}

View File

@@ -239,5 +239,3 @@ export const WorkflowOperationSchema = z.union([
VariableOperationSchema,
WorkflowStateOperationSchema,
])
export { PositionSchema, AutoConnectEdgeSchema }

View File

@@ -102,7 +102,7 @@ export interface TraceSpan {
export interface WorkflowLog {
id: string
workflowId: string
workflowId: string | null
executionId?: string | null
deploymentVersion?: number | null
deploymentVersionName?: string | null

View File

@@ -4,6 +4,19 @@ import type { OperationQueueState, QueuedOperation } from './types'
const logger = createLogger('OperationQueue')
/** Timeout for subblock/variable operations before considering them failed */
const SUBBLOCK_VARIABLE_TIMEOUT_MS = 15000
/** Timeout for structural operations before considering them failed */
const STRUCTURAL_TIMEOUT_MS = 5000
/** Maximum retry attempts for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRIES = 5
/** Maximum retry attempts for structural operations */
const STRUCTURAL_MAX_RETRIES = 3
/** Maximum retry delay cap for subblock/variable operations */
const SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS = 3000
/** Base retry delay multiplier (1s, 2s, 3s for linear) */
const RETRY_DELAY_BASE_MS = 1000
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
@@ -200,14 +213,14 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
(operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable')
const maxRetries = isSubblockOrVariable ? 5 : 3 // 5 retries for text, 3 for structural
const maxRetries = isSubblockOrVariable ? SUBBLOCK_VARIABLE_MAX_RETRIES : STRUCTURAL_MAX_RETRIES
if (operation.retryCount < maxRetries) {
const newRetryCount = operation.retryCount + 1
// Faster retries for subblock/variable, exponential for structural
const delay = isSubblockOrVariable
? Math.min(1000 * newRetryCount, 3000) // 1s, 2s, 3s, 3s, 3s (cap at 3s)
: 2 ** newRetryCount * 1000 // 2s, 4s, 8s (exponential for structural)
? Math.min(RETRY_DELAY_BASE_MS * newRetryCount, SUBBLOCK_VARIABLE_MAX_RETRY_DELAY_MS)
: 2 ** newRetryCount * RETRY_DELAY_BASE_MS
logger.warn(
`Operation failed, retrying in ${delay}ms (attempt ${newRetryCount}/${maxRetries})`,
@@ -309,7 +322,9 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
nextOperation.operation.target === 'subblock') ||
(nextOperation.operation.operation === 'variable-update' &&
nextOperation.operation.target === 'variable')
const timeoutDuration = isSubblockOrVariable ? 15000 : 5000 // 15s for text edits, 5s for structural ops
const timeoutDuration = isSubblockOrVariable
? SUBBLOCK_VARIABLE_TIMEOUT_MS
: STRUCTURAL_TIMEOUT_MS
const timeoutId = setTimeout(() => {
logger.warn(`Operation timeout - no server response after ${timeoutDuration}ms`, {

View File

@@ -1648,6 +1648,8 @@ import {
youtubeCommentsTool,
youtubePlaylistItemsTool,
youtubeSearchTool,
youtubeTrendingTool,
youtubeVideoCategoriesTool,
youtubeVideoDetailsTool,
} from '@/tools/youtube'
import {
@@ -1982,13 +1984,15 @@ export const tools: Record<string, ToolConfig> = {
typeform_create_form: typeformCreateFormTool,
typeform_update_form: typeformUpdateFormTool,
typeform_delete_form: typeformDeleteFormTool,
youtube_search: youtubeSearchTool,
youtube_video_details: youtubeVideoDetailsTool,
youtube_channel_info: youtubeChannelInfoTool,
youtube_playlist_items: youtubePlaylistItemsTool,
youtube_comments: youtubeCommentsTool,
youtube_channel_videos: youtubeChannelVideosTool,
youtube_channel_playlists: youtubeChannelPlaylistsTool,
youtube_channel_videos: youtubeChannelVideosTool,
youtube_comments: youtubeCommentsTool,
youtube_playlist_items: youtubePlaylistItemsTool,
youtube_search: youtubeSearchTool,
youtube_trending: youtubeTrendingTool,
youtube_video_categories: youtubeVideoCategoriesTool,
youtube_video_details: youtubeVideoDetailsTool,
notion_read: notionReadTool,
notion_read_database: notionReadDatabaseTool,
notion_write: notionWriteTool,

View File

@@ -7,8 +7,9 @@ export const youtubeChannelInfoTool: ToolConfig<
> = {
id: 'youtube_channel_info',
name: 'YouTube Channel Info',
description: 'Get detailed information about a YouTube channel.',
version: '1.0.0',
description:
'Get detailed information about a YouTube channel including statistics, branding, and content details.',
version: '1.1.0',
params: {
channelId: {
type: 'string',
@@ -33,11 +34,11 @@ export const youtubeChannelInfoTool: ToolConfig<
request: {
url: (params: YouTubeChannelInfoParams) => {
let url =
'https://www.googleapis.com/youtube/v3/channels?part=snippet,statistics,contentDetails'
'https://www.googleapis.com/youtube/v3/channels?part=snippet,statistics,contentDetails,brandingSettings'
if (params.channelId) {
url += `&id=${params.channelId}`
url += `&id=${encodeURIComponent(params.channelId)}`
} else if (params.username) {
url += `&forUsername=${params.username}`
url += `&forUsername=${encodeURIComponent(params.username)}`
}
url += `&key=${params.apiKey}`
return url
@@ -63,6 +64,11 @@ export const youtubeChannelInfoTool: ToolConfig<
viewCount: 0,
publishedAt: '',
thumbnail: '',
customUrl: null,
country: null,
uploadsPlaylistId: null,
bannerImageUrl: null,
hiddenSubscriberCount: false,
},
error: 'Channel not found',
}
@@ -72,19 +78,23 @@ export const youtubeChannelInfoTool: ToolConfig<
return {
success: true,
output: {
channelId: item.id,
title: item.snippet?.title || '',
description: item.snippet?.description || '',
channelId: item.id ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
subscriberCount: Number(item.statistics?.subscriberCount || 0),
videoCount: Number(item.statistics?.videoCount || 0),
viewCount: Number(item.statistics?.viewCount || 0),
publishedAt: item.snippet?.publishedAt || '',
publishedAt: item.snippet?.publishedAt ?? '',
thumbnail:
item.snippet?.thumbnails?.high?.url ||
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
'',
customUrl: item.snippet?.customUrl,
customUrl: item.snippet?.customUrl ?? null,
country: item.snippet?.country ?? null,
uploadsPlaylistId: item.contentDetails?.relatedPlaylists?.uploads ?? null,
bannerImageUrl: item.brandingSettings?.image?.bannerExternalUrl ?? null,
hiddenSubscriberCount: item.statistics?.hiddenSubscriberCount ?? false,
},
}
},
@@ -104,11 +114,11 @@ export const youtubeChannelInfoTool: ToolConfig<
},
subscriberCount: {
type: 'number',
description: 'Number of subscribers',
description: 'Number of subscribers (0 if hidden)',
},
videoCount: {
type: 'number',
description: 'Number of videos',
description: 'Number of public videos',
},
viewCount: {
type: 'number',
@@ -120,12 +130,31 @@ export const youtubeChannelInfoTool: ToolConfig<
},
thumbnail: {
type: 'string',
description: 'Channel thumbnail URL',
description: 'Channel thumbnail/avatar URL',
},
customUrl: {
type: 'string',
description: 'Channel custom URL',
description: 'Channel custom URL (handle)',
optional: true,
},
country: {
type: 'string',
description: 'Country the channel is associated with',
optional: true,
},
uploadsPlaylistId: {
type: 'string',
description: 'Playlist ID containing all channel uploads (use with playlist_items)',
optional: true,
},
bannerImageUrl: {
type: 'string',
description: 'Channel banner image URL',
optional: true,
},
hiddenSubscriberCount: {
type: 'boolean',
description: 'Whether the subscriber count is hidden',
},
},
}

View File

@@ -10,8 +10,8 @@ export const youtubeChannelPlaylistsTool: ToolConfig<
> = {
id: 'youtube_channel_playlists',
name: 'YouTube Channel Playlists',
description: 'Get all playlists from a specific YouTube channel.',
version: '1.0.0',
description: 'Get all public playlists from a specific YouTube channel.',
version: '1.1.0',
params: {
channelId: {
type: 'string',
@@ -47,7 +47,7 @@ export const youtubeChannelPlaylistsTool: ToolConfig<
)}&key=${params.apiKey}`
url += `&maxResults=${Number(params.maxResults || 10)}`
if (params.pageToken) {
url += `&pageToken=${params.pageToken}`
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
return url
},
@@ -60,36 +60,49 @@ export const youtubeChannelPlaylistsTool: ToolConfig<
transformResponse: async (response: Response): Promise<YouTubeChannelPlaylistsResponse> => {
const data = await response.json()
if (!data.items) {
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Failed to fetch channel playlists',
}
}
if (!data.items || data.items.length === 0) {
return {
success: true,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: 'No playlists found',
}
}
const items = (data.items || []).map((item: any) => ({
playlistId: item.id,
title: item.snippet?.title || '',
description: item.snippet?.description || '',
playlistId: item.id ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
thumbnail:
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
item.snippet?.thumbnails?.high?.url ||
'',
itemCount: item.contentDetails?.itemCount || 0,
publishedAt: item.snippet?.publishedAt || '',
itemCount: Number(item.contentDetails?.itemCount || 0),
publishedAt: item.snippet?.publishedAt ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
}))
return {
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || 0,
nextPageToken: data.nextPageToken,
totalResults: data.pageInfo?.totalResults || items.length,
nextPageToken: data.nextPageToken ?? null,
},
}
},
@@ -107,6 +120,7 @@ export const youtubeChannelPlaylistsTool: ToolConfig<
thumbnail: { type: 'string', description: 'Playlist thumbnail URL' },
itemCount: { type: 'number', description: 'Number of videos in playlist' },
publishedAt: { type: 'string', description: 'Playlist creation date' },
channelTitle: { type: 'string', description: 'Channel name' },
},
},
},

View File

@@ -10,8 +10,9 @@ export const youtubeChannelVideosTool: ToolConfig<
> = {
id: 'youtube_channel_videos',
name: 'YouTube Channel Videos',
description: 'Get all videos from a specific YouTube channel, with sorting options.',
version: '1.0.0',
description:
'Search for videos from a specific YouTube channel with sorting options. For complete channel video list, use channel_info to get uploadsPlaylistId, then use playlist_items.',
version: '1.1.0',
params: {
channelId: {
type: 'string',
@@ -30,7 +31,8 @@ export const youtubeChannelVideosTool: ToolConfig<
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Sort order: "date" (newest first), "rating", "relevance", "title", "viewCount"',
description:
'Sort order: "date" (newest first, default), "rating", "relevance", "title", "viewCount"',
},
pageToken: {
type: 'string',
@@ -52,11 +54,9 @@ export const youtubeChannelVideosTool: ToolConfig<
params.channelId
)}&key=${params.apiKey}`
url += `&maxResults=${Number(params.maxResults || 10)}`
if (params.order) {
url += `&order=${params.order}`
}
url += `&order=${params.order || 'date'}`
if (params.pageToken) {
url += `&pageToken=${params.pageToken}`
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
return url
},
@@ -68,23 +68,38 @@ export const youtubeChannelVideosTool: ToolConfig<
transformResponse: async (response: Response): Promise<YouTubeChannelVideosResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Failed to fetch channel videos',
}
}
const items = (data.items || []).map((item: any) => ({
videoId: item.id?.videoId,
title: item.snippet?.title,
description: item.snippet?.description,
videoId: item.id?.videoId ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
thumbnail:
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
item.snippet?.thumbnails?.high?.url ||
'',
publishedAt: item.snippet?.publishedAt || '',
publishedAt: item.snippet?.publishedAt ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
}))
return {
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || 0,
nextPageToken: data.nextPageToken,
totalResults: data.pageInfo?.totalResults || items.length,
nextPageToken: data.nextPageToken ?? null,
},
}
},
@@ -101,6 +116,7 @@ export const youtubeChannelVideosTool: ToolConfig<
description: { type: 'string', description: 'Video description' },
thumbnail: { type: 'string', description: 'Video thumbnail URL' },
publishedAt: { type: 'string', description: 'Video publish date' },
channelTitle: { type: 'string', description: 'Channel name' },
},
},
},

View File

@@ -4,8 +4,8 @@ import type { YouTubeCommentsParams, YouTubeCommentsResponse } from '@/tools/you
export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeCommentsResponse> = {
id: 'youtube_comments',
name: 'YouTube Video Comments',
description: 'Get comments from a YouTube video.',
version: '1.0.0',
description: 'Get top-level comments from a YouTube video with author details and engagement.',
version: '1.1.0',
params: {
videoId: {
type: 'string',
@@ -18,14 +18,14 @@ export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeComme
required: false,
visibility: 'user-only',
default: 20,
description: 'Maximum number of comments to return',
description: 'Maximum number of comments to return (1-100)',
},
order: {
type: 'string',
required: false,
visibility: 'user-only',
visibility: 'user-or-llm',
default: 'relevance',
description: 'Order of comments: time or relevance',
description: 'Order of comments: "time" (newest first) or "relevance" (most relevant first)',
},
pageToken: {
type: 'string',
@@ -43,11 +43,11 @@ export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeComme
request: {
url: (params: YouTubeCommentsParams) => {
let url = `https://www.googleapis.com/youtube/v3/commentThreads?part=snippet,replies&videoId=${params.videoId}&key=${params.apiKey}`
let url = `https://www.googleapis.com/youtube/v3/commentThreads?part=snippet,replies&videoId=${encodeURIComponent(params.videoId)}&key=${params.apiKey}`
url += `&maxResults=${Number(params.maxResults || 20)}`
url += `&order=${params.order || 'relevance'}`
if (params.pageToken) {
url += `&pageToken=${params.pageToken}`
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
return url
},
@@ -60,18 +60,31 @@ export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeComme
transformResponse: async (response: Response): Promise<YouTubeCommentsResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Failed to fetch comments',
}
}
const items = (data.items || []).map((item: any) => {
const topLevelComment = item.snippet?.topLevelComment?.snippet
return {
commentId: item.snippet?.topLevelComment?.id || item.id,
authorDisplayName: topLevelComment?.authorDisplayName || '',
authorChannelUrl: topLevelComment?.authorChannelUrl || '',
textDisplay: topLevelComment?.textDisplay || '',
textOriginal: topLevelComment?.textOriginal || '',
likeCount: topLevelComment?.likeCount || 0,
publishedAt: topLevelComment?.publishedAt || '',
updatedAt: topLevelComment?.updatedAt || '',
replyCount: item.snippet?.totalReplyCount || 0,
commentId: item.snippet?.topLevelComment?.id ?? item.id ?? '',
authorDisplayName: topLevelComment?.authorDisplayName ?? '',
authorChannelUrl: topLevelComment?.authorChannelUrl ?? '',
authorProfileImageUrl: topLevelComment?.authorProfileImageUrl ?? '',
textDisplay: topLevelComment?.textDisplay ?? '',
textOriginal: topLevelComment?.textOriginal ?? '',
likeCount: Number(topLevelComment?.likeCount || 0),
publishedAt: topLevelComment?.publishedAt ?? '',
updatedAt: topLevelComment?.updatedAt ?? '',
replyCount: Number(item.snippet?.totalReplyCount || 0),
}
})
@@ -79,8 +92,8 @@ export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeComme
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || 0,
nextPageToken: data.nextPageToken,
totalResults: data.pageInfo?.totalResults || items.length,
nextPageToken: data.nextPageToken ?? null,
},
}
},
@@ -88,25 +101,29 @@ export const youtubeCommentsTool: ToolConfig<YouTubeCommentsParams, YouTubeComme
outputs: {
items: {
type: 'array',
description: 'Array of comments from the video',
description: 'Array of top-level comments from the video',
items: {
type: 'object',
properties: {
commentId: { type: 'string', description: 'Comment ID' },
authorDisplayName: { type: 'string', description: 'Comment author name' },
authorDisplayName: { type: 'string', description: 'Comment author display name' },
authorChannelUrl: { type: 'string', description: 'Comment author channel URL' },
authorProfileImageUrl: {
type: 'string',
description: 'Comment author profile image URL',
},
textDisplay: { type: 'string', description: 'Comment text (HTML formatted)' },
textOriginal: { type: 'string', description: 'Comment text (plain text)' },
likeCount: { type: 'number', description: 'Number of likes' },
publishedAt: { type: 'string', description: 'Comment publish date' },
updatedAt: { type: 'string', description: 'Comment last updated date' },
replyCount: { type: 'number', description: 'Number of replies', optional: true },
likeCount: { type: 'number', description: 'Number of likes on the comment' },
publishedAt: { type: 'string', description: 'When the comment was posted' },
updatedAt: { type: 'string', description: 'When the comment was last edited' },
replyCount: { type: 'number', description: 'Number of replies to this comment' },
},
},
},
totalResults: {
type: 'number',
description: 'Total number of comments',
description: 'Total number of comment threads available',
},
nextPageToken: {
type: 'string',

View File

@@ -4,6 +4,8 @@ import { youtubeChannelVideosTool } from '@/tools/youtube/channel_videos'
import { youtubeCommentsTool } from '@/tools/youtube/comments'
import { youtubePlaylistItemsTool } from '@/tools/youtube/playlist_items'
import { youtubeSearchTool } from '@/tools/youtube/search'
import { youtubeTrendingTool } from '@/tools/youtube/trending'
import { youtubeVideoCategoriesTool } from '@/tools/youtube/video_categories'
import { youtubeVideoDetailsTool } from '@/tools/youtube/video_details'
export { youtubeSearchTool }
@@ -13,3 +15,5 @@ export { youtubePlaylistItemsTool }
export { youtubeCommentsTool }
export { youtubeChannelVideosTool }
export { youtubeChannelPlaylistsTool }
export { youtubeTrendingTool }
export { youtubeVideoCategoriesTool }

View File

@@ -10,21 +10,23 @@ export const youtubePlaylistItemsTool: ToolConfig<
> = {
id: 'youtube_playlist_items',
name: 'YouTube Playlist Items',
description: 'Get videos from a YouTube playlist.',
version: '1.0.0',
description:
'Get videos from a YouTube playlist. Can be used with a channel uploads playlist to get all channel videos.',
version: '1.1.0',
params: {
playlistId: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'YouTube playlist ID',
description:
'YouTube playlist ID. Use uploadsPlaylistId from channel_info to get all channel videos.',
},
maxResults: {
type: 'number',
required: false,
visibility: 'user-only',
default: 10,
description: 'Maximum number of videos to return',
description: 'Maximum number of videos to return (1-50)',
},
pageToken: {
type: 'string',
@@ -42,10 +44,10 @@ export const youtubePlaylistItemsTool: ToolConfig<
request: {
url: (params: YouTubePlaylistItemsParams) => {
let url = `https://www.googleapis.com/youtube/v3/playlistItems?part=snippet,contentDetails&playlistId=${params.playlistId}&key=${params.apiKey}`
let url = `https://www.googleapis.com/youtube/v3/playlistItems?part=snippet,contentDetails&playlistId=${encodeURIComponent(params.playlistId)}&key=${params.apiKey}`
url += `&maxResults=${Number(params.maxResults || 10)}`
if (params.pageToken) {
url += `&pageToken=${params.pageToken}`
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
return url
},
@@ -58,26 +60,40 @@ export const youtubePlaylistItemsTool: ToolConfig<
transformResponse: async (response: Response): Promise<YouTubePlaylistItemsResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Failed to fetch playlist items',
}
}
const items = (data.items || []).map((item: any, index: number) => ({
videoId: item.contentDetails?.videoId || item.snippet?.resourceId?.videoId,
title: item.snippet?.title || '',
description: item.snippet?.description || '',
videoId: item.contentDetails?.videoId ?? item.snippet?.resourceId?.videoId ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
thumbnail:
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
item.snippet?.thumbnails?.high?.url ||
'',
publishedAt: item.snippet?.publishedAt || '',
channelTitle: item.snippet?.channelTitle || '',
publishedAt: item.snippet?.publishedAt ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
position: item.snippet?.position ?? index,
videoOwnerChannelId: item.snippet?.videoOwnerChannelId ?? null,
videoOwnerChannelTitle: item.snippet?.videoOwnerChannelTitle ?? null,
}))
return {
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || 0,
nextPageToken: data.nextPageToken,
totalResults: data.pageInfo?.totalResults || items.length,
nextPageToken: data.nextPageToken ?? null,
},
}
},
@@ -94,8 +110,18 @@ export const youtubePlaylistItemsTool: ToolConfig<
description: { type: 'string', description: 'Video description' },
thumbnail: { type: 'string', description: 'Video thumbnail URL' },
publishedAt: { type: 'string', description: 'Date added to playlist' },
channelTitle: { type: 'string', description: 'Channel name' },
position: { type: 'number', description: 'Position in playlist' },
channelTitle: { type: 'string', description: 'Playlist owner channel name' },
position: { type: 'number', description: 'Position in playlist (0-indexed)' },
videoOwnerChannelId: {
type: 'string',
description: 'Channel ID of the video owner',
optional: true,
},
videoOwnerChannelTitle: {
type: 'string',
description: 'Channel name of the video owner',
optional: true,
},
},
},
},

View File

@@ -5,8 +5,8 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
id: 'youtube_search',
name: 'YouTube Search',
description:
'Search for videos on YouTube using the YouTube Data API. Supports advanced filtering by channel, date range, duration, category, quality, captions, and more.',
version: '1.0.0',
'Search for videos on YouTube using the YouTube Data API. Supports advanced filtering by channel, date range, duration, category, quality, captions, live streams, and more.',
version: '1.2.0',
params: {
query: {
type: 'string',
@@ -21,13 +21,18 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
default: 5,
description: 'Maximum number of videos to return (1-50)',
},
pageToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Page token for pagination (use nextPageToken from previous response)',
},
apiKey: {
type: 'string',
required: true,
visibility: 'user-only',
description: 'YouTube API Key',
},
// Priority 1: Essential filters
channelId: {
type: 'string',
required: false,
@@ -66,9 +71,9 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Filter by YouTube category ID (e.g., "10" for Music, "20" for Gaming)',
description:
'Filter by YouTube category ID (e.g., "10" for Music, "20" for Gaming). Use video_categories to list IDs.',
},
// Priority 2: Very useful filters
videoDefinition: {
type: 'string',
required: false,
@@ -82,6 +87,13 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
description:
'Filter by caption availability: "closedCaption" (has captions), "none" (no captions), "any"',
},
eventType: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description:
'Filter by live broadcast status: "live" (currently live), "upcoming" (scheduled), "completed" (past streams)',
},
regionCode: {
type: 'string',
required: false,
@@ -110,7 +122,9 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
)}`
url += `&maxResults=${Number(params.maxResults || 5)}`
// Add Priority 1 filters if provided
if (params.pageToken) {
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
if (params.channelId) {
url += `&channelId=${encodeURIComponent(params.channelId)}`
}
@@ -129,14 +143,15 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
if (params.videoCategoryId) {
url += `&videoCategoryId=${params.videoCategoryId}`
}
// Add Priority 2 filters if provided
if (params.videoDefinition) {
url += `&videoDefinition=${params.videoDefinition}`
}
if (params.videoCaption) {
url += `&videoCaption=${params.videoCaption}`
}
if (params.eventType) {
url += `&eventType=${params.eventType}`
}
if (params.regionCode) {
url += `&regionCode=${params.regionCode}`
}
@@ -157,22 +172,39 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
transformResponse: async (response: Response): Promise<YouTubeSearchResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Search failed',
}
}
const items = (data.items || []).map((item: any) => ({
videoId: item.id?.videoId,
title: item.snippet?.title,
description: item.snippet?.description,
videoId: item.id?.videoId ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
thumbnail:
item.snippet?.thumbnails?.default?.url ||
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.high?.url ||
'',
channelId: item.snippet?.channelId ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
publishedAt: item.snippet?.publishedAt ?? '',
liveBroadcastContent: item.snippet?.liveBroadcastContent ?? 'none',
}))
return {
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || 0,
nextPageToken: data.nextPageToken,
nextPageToken: data.nextPageToken ?? null,
},
}
},
@@ -188,6 +220,13 @@ export const youtubeSearchTool: ToolConfig<YouTubeSearchParams, YouTubeSearchRes
title: { type: 'string', description: 'Video title' },
description: { type: 'string', description: 'Video description' },
thumbnail: { type: 'string', description: 'Video thumbnail URL' },
channelId: { type: 'string', description: 'Channel ID that uploaded the video' },
channelTitle: { type: 'string', description: 'Channel name' },
publishedAt: { type: 'string', description: 'Video publish date' },
liveBroadcastContent: {
type: 'string',
description: 'Live broadcast status: "none", "live", or "upcoming"',
},
},
},
},

View File

@@ -0,0 +1,139 @@
import type { ToolConfig } from '@/tools/types'
import type { YouTubeTrendingParams, YouTubeTrendingResponse } from '@/tools/youtube/types'
export const youtubeTrendingTool: ToolConfig<YouTubeTrendingParams, YouTubeTrendingResponse> = {
id: 'youtube_trending',
name: 'YouTube Trending Videos',
description:
'Get the most popular/trending videos on YouTube. Can filter by region and video category.',
version: '1.0.0',
params: {
regionCode: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description:
'ISO 3166-1 alpha-2 country code to get trending videos for (e.g., "US", "GB", "JP"). Defaults to US.',
},
videoCategoryId: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description:
'Filter by video category ID (e.g., "10" for Music, "20" for Gaming, "17" for Sports)',
},
maxResults: {
type: 'number',
required: false,
visibility: 'user-only',
default: 10,
description: 'Maximum number of trending videos to return (1-50)',
},
pageToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Page token for pagination',
},
apiKey: {
type: 'string',
required: true,
visibility: 'user-only',
description: 'YouTube API Key',
},
},
request: {
url: (params: YouTubeTrendingParams) => {
let url = `https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics,contentDetails&chart=mostPopular&key=${params.apiKey}`
url += `&maxResults=${Number(params.maxResults || 10)}`
url += `&regionCode=${params.regionCode || 'US'}`
if (params.videoCategoryId) {
url += `&videoCategoryId=${params.videoCategoryId}`
}
if (params.pageToken) {
url += `&pageToken=${encodeURIComponent(params.pageToken)}`
}
return url
},
method: 'GET',
headers: () => ({
'Content-Type': 'application/json',
}),
},
transformResponse: async (response: Response): Promise<YouTubeTrendingResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
nextPageToken: null,
},
error: data.error.message || 'Failed to fetch trending videos',
}
}
const items = (data.items || []).map((item: any) => ({
videoId: item.id ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
thumbnail:
item.snippet?.thumbnails?.high?.url ||
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
'',
channelId: item.snippet?.channelId ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
publishedAt: item.snippet?.publishedAt ?? '',
viewCount: Number(item.statistics?.viewCount || 0),
likeCount: Number(item.statistics?.likeCount || 0),
commentCount: Number(item.statistics?.commentCount || 0),
duration: item.contentDetails?.duration ?? '',
}))
return {
success: true,
output: {
items,
totalResults: data.pageInfo?.totalResults || items.length,
nextPageToken: data.nextPageToken ?? null,
},
}
},
outputs: {
items: {
type: 'array',
description: 'Array of trending videos',
items: {
type: 'object',
properties: {
videoId: { type: 'string', description: 'YouTube video ID' },
title: { type: 'string', description: 'Video title' },
description: { type: 'string', description: 'Video description' },
thumbnail: { type: 'string', description: 'Video thumbnail URL' },
channelId: { type: 'string', description: 'Channel ID' },
channelTitle: { type: 'string', description: 'Channel name' },
publishedAt: { type: 'string', description: 'Video publish date' },
viewCount: { type: 'number', description: 'Number of views' },
likeCount: { type: 'number', description: 'Number of likes' },
commentCount: { type: 'number', description: 'Number of comments' },
duration: { type: 'string', description: 'Video duration in ISO 8601 format' },
},
},
},
totalResults: {
type: 'number',
description: 'Total number of trending videos available',
},
nextPageToken: {
type: 'string',
description: 'Token for accessing the next page of results',
optional: true,
},
},
}

View File

@@ -16,6 +16,7 @@ export interface YouTubeSearchParams {
regionCode?: string
relevanceLanguage?: string
safeSearch?: 'moderate' | 'none' | 'strict'
eventType?: 'completed' | 'live' | 'upcoming'
}
export interface YouTubeSearchResponse extends ToolResponse {
@@ -25,9 +26,13 @@ export interface YouTubeSearchResponse extends ToolResponse {
title: string
description: string
thumbnail: string
channelId: string
channelTitle: string
publishedAt: string
liveBroadcastContent: string
}>
totalResults: number
nextPageToken?: string
nextPageToken?: string | null
}
}
@@ -48,8 +53,24 @@ export interface YouTubeVideoDetailsResponse extends ToolResponse {
viewCount: number
likeCount: number
commentCount: number
favoriteCount: number
thumbnail: string
tags?: string[]
tags: string[]
categoryId: string | null
definition: string | null
caption: string | null
licensedContent: boolean | null
privacyStatus: string | null
liveBroadcastContent: string | null
defaultLanguage: string | null
defaultAudioLanguage: string | null
// Live streaming details
isLiveContent: boolean
scheduledStartTime: string | null
actualStartTime: string | null
actualEndTime: string | null
concurrentViewers: number | null
activeLiveChatId: string | null
}
}
@@ -69,7 +90,11 @@ export interface YouTubeChannelInfoResponse extends ToolResponse {
viewCount: number
publishedAt: string
thumbnail: string
customUrl?: string
customUrl: string | null
country: string | null
uploadsPlaylistId: string | null
bannerImageUrl: string | null
hiddenSubscriberCount: boolean
}
}
@@ -90,9 +115,11 @@ export interface YouTubePlaylistItemsResponse extends ToolResponse {
publishedAt: string
channelTitle: string
position: number
videoOwnerChannelId: string | null
videoOwnerChannelTitle: string | null
}>
totalResults: number
nextPageToken?: string
nextPageToken?: string | null
}
}
@@ -110,15 +137,16 @@ export interface YouTubeCommentsResponse extends ToolResponse {
commentId: string
authorDisplayName: string
authorChannelUrl: string
authorProfileImageUrl: string
textDisplay: string
textOriginal: string
likeCount: number
publishedAt: string
updatedAt: string
replyCount?: number
replyCount: number
}>
totalResults: number
nextPageToken?: string
nextPageToken?: string | null
}
}
@@ -138,9 +166,10 @@ export interface YouTubeChannelVideosResponse extends ToolResponse {
description: string
thumbnail: string
publishedAt: string
channelTitle: string
}>
totalResults: number
nextPageToken?: string
nextPageToken?: string | null
}
}
@@ -160,9 +189,55 @@ export interface YouTubeChannelPlaylistsResponse extends ToolResponse {
thumbnail: string
itemCount: number
publishedAt: string
channelTitle: string
}>
totalResults: number
nextPageToken?: string | null
}
}
export interface YouTubeTrendingParams {
apiKey: string
regionCode?: string
videoCategoryId?: string
maxResults?: number
pageToken?: string
}
export interface YouTubeTrendingResponse extends ToolResponse {
output: {
items: Array<{
videoId: string
title: string
description: string
thumbnail: string
channelId: string
channelTitle: string
publishedAt: string
viewCount: number
likeCount: number
commentCount: number
duration: string
}>
totalResults: number
nextPageToken?: string | null
}
}
export interface YouTubeVideoCategoriesParams {
apiKey: string
regionCode?: string
hl?: string
}
export interface YouTubeVideoCategoriesResponse extends ToolResponse {
output: {
items: Array<{
categoryId: string
title: string
assignable: boolean
}>
totalResults: number
nextPageToken?: string
}
}
@@ -174,3 +249,5 @@ export type YouTubeResponse =
| YouTubeCommentsResponse
| YouTubeChannelVideosResponse
| YouTubeChannelPlaylistsResponse
| YouTubeTrendingResponse
| YouTubeVideoCategoriesResponse

View File

@@ -0,0 +1,108 @@
import type { ToolConfig } from '@/tools/types'
import type {
YouTubeVideoCategoriesParams,
YouTubeVideoCategoriesResponse,
} from '@/tools/youtube/types'
export const youtubeVideoCategoriesTool: ToolConfig<
YouTubeVideoCategoriesParams,
YouTubeVideoCategoriesResponse
> = {
id: 'youtube_video_categories',
name: 'YouTube Video Categories',
description:
'Get a list of video categories available on YouTube. Use this to discover valid category IDs for filtering search and trending results.',
version: '1.0.0',
params: {
regionCode: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description:
'ISO 3166-1 alpha-2 country code to get categories for (e.g., "US", "GB", "JP"). Defaults to US.',
},
hl: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Language for category titles (e.g., "en", "es", "fr"). Defaults to English.',
},
apiKey: {
type: 'string',
required: true,
visibility: 'user-only',
description: 'YouTube API Key',
},
},
request: {
url: (params: YouTubeVideoCategoriesParams) => {
let url = `https://www.googleapis.com/youtube/v3/videoCategories?part=snippet&key=${params.apiKey}`
url += `&regionCode=${params.regionCode || 'US'}`
if (params.hl) {
url += `&hl=${params.hl}`
}
return url
},
method: 'GET',
headers: () => ({
'Content-Type': 'application/json',
}),
},
transformResponse: async (response: Response): Promise<YouTubeVideoCategoriesResponse> => {
const data = await response.json()
if (data.error) {
return {
success: false,
output: {
items: [],
totalResults: 0,
},
error: data.error.message || 'Failed to fetch video categories',
}
}
const items = (data.items || [])
.filter((item: any) => item.snippet?.assignable !== false)
.map((item: any) => ({
categoryId: item.id ?? '',
title: item.snippet?.title ?? '',
assignable: item.snippet?.assignable ?? false,
}))
return {
success: true,
output: {
items,
totalResults: items.length,
},
}
},
outputs: {
items: {
type: 'array',
description: 'Array of video categories available in the specified region',
items: {
type: 'object',
properties: {
categoryId: {
type: 'string',
description: 'Category ID to use in search/trending filters (e.g., "10" for Music)',
},
title: { type: 'string', description: 'Human-readable category name' },
assignable: {
type: 'boolean',
description: 'Whether videos can be tagged with this category',
},
},
},
},
totalResults: {
type: 'number',
description: 'Total number of categories available',
},
},
}

View File

@@ -7,8 +7,9 @@ export const youtubeVideoDetailsTool: ToolConfig<
> = {
id: 'youtube_video_details',
name: 'YouTube Video Details',
description: 'Get detailed information about a specific YouTube video.',
version: '1.0.0',
description:
'Get detailed information about a specific YouTube video including statistics, content details, live streaming info, and metadata.',
version: '1.2.0',
params: {
videoId: {
type: 'string',
@@ -26,7 +27,7 @@ export const youtubeVideoDetailsTool: ToolConfig<
request: {
url: (params: YouTubeVideoDetailsParams) => {
return `https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics,contentDetails&id=${params.videoId}&key=${params.apiKey}`
return `https://www.googleapis.com/youtube/v3/videos?part=snippet,statistics,contentDetails,status,liveStreamingDetails&id=${encodeURIComponent(params.videoId)}&key=${params.apiKey}`
},
method: 'GET',
headers: () => ({
@@ -51,32 +52,68 @@ export const youtubeVideoDetailsTool: ToolConfig<
viewCount: 0,
likeCount: 0,
commentCount: 0,
favoriteCount: 0,
thumbnail: '',
tags: [],
categoryId: null,
definition: null,
caption: null,
licensedContent: null,
privacyStatus: null,
liveBroadcastContent: null,
defaultLanguage: null,
defaultAudioLanguage: null,
isLiveContent: false,
scheduledStartTime: null,
actualStartTime: null,
actualEndTime: null,
concurrentViewers: null,
activeLiveChatId: null,
},
error: 'Video not found',
}
}
const item = data.items[0]
const liveDetails = item.liveStreamingDetails
return {
success: true,
output: {
videoId: item.id,
title: item.snippet?.title || '',
description: item.snippet?.description || '',
channelId: item.snippet?.channelId || '',
channelTitle: item.snippet?.channelTitle || '',
publishedAt: item.snippet?.publishedAt || '',
duration: item.contentDetails?.duration || '',
videoId: item.id ?? '',
title: item.snippet?.title ?? '',
description: item.snippet?.description ?? '',
channelId: item.snippet?.channelId ?? '',
channelTitle: item.snippet?.channelTitle ?? '',
publishedAt: item.snippet?.publishedAt ?? '',
duration: item.contentDetails?.duration ?? '',
viewCount: Number(item.statistics?.viewCount || 0),
likeCount: Number(item.statistics?.likeCount || 0),
commentCount: Number(item.statistics?.commentCount || 0),
favoriteCount: Number(item.statistics?.favoriteCount || 0),
thumbnail:
item.snippet?.thumbnails?.high?.url ||
item.snippet?.thumbnails?.medium?.url ||
item.snippet?.thumbnails?.default?.url ||
'',
tags: item.snippet?.tags || [],
tags: item.snippet?.tags ?? [],
categoryId: item.snippet?.categoryId ?? null,
definition: item.contentDetails?.definition ?? null,
caption: item.contentDetails?.caption ?? null,
licensedContent: item.contentDetails?.licensedContent ?? null,
privacyStatus: item.status?.privacyStatus ?? null,
liveBroadcastContent: item.snippet?.liveBroadcastContent ?? null,
defaultLanguage: item.snippet?.defaultLanguage ?? null,
defaultAudioLanguage: item.snippet?.defaultAudioLanguage ?? null,
// Live streaming details
isLiveContent: liveDetails !== undefined,
scheduledStartTime: liveDetails?.scheduledStartTime ?? null,
actualStartTime: liveDetails?.actualStartTime ?? null,
actualEndTime: liveDetails?.actualEndTime ?? null,
concurrentViewers: liveDetails?.concurrentViewers
? Number(liveDetails.concurrentViewers)
: null,
activeLiveChatId: liveDetails?.activeLiveChatId ?? null,
},
}
},
@@ -108,7 +145,7 @@ export const youtubeVideoDetailsTool: ToolConfig<
},
duration: {
type: 'string',
description: 'Video duration in ISO 8601 format',
description: 'Video duration in ISO 8601 format (e.g., "PT4M13S" for 4 min 13 sec)',
},
viewCount: {
type: 'number',
@@ -122,6 +159,10 @@ export const youtubeVideoDetailsTool: ToolConfig<
type: 'number',
description: 'Number of comments',
},
favoriteCount: {
type: 'number',
description: 'Number of times added to favorites',
},
thumbnail: {
type: 'string',
description: 'Video thumbnail URL',
@@ -132,6 +173,74 @@ export const youtubeVideoDetailsTool: ToolConfig<
items: {
type: 'string',
},
},
categoryId: {
type: 'string',
description: 'YouTube video category ID',
optional: true,
},
definition: {
type: 'string',
description: 'Video definition: "hd" or "sd"',
optional: true,
},
caption: {
type: 'string',
description: 'Whether captions are available: "true" or "false"',
optional: true,
},
licensedContent: {
type: 'boolean',
description: 'Whether the video is licensed content',
optional: true,
},
privacyStatus: {
type: 'string',
description: 'Video privacy status: "public", "private", or "unlisted"',
optional: true,
},
liveBroadcastContent: {
type: 'string',
description: 'Live broadcast status: "live", "upcoming", or "none"',
optional: true,
},
defaultLanguage: {
type: 'string',
description: 'Default language of the video metadata',
optional: true,
},
defaultAudioLanguage: {
type: 'string',
description: 'Default audio language of the video',
optional: true,
},
isLiveContent: {
type: 'boolean',
description: 'Whether this video is or was a live stream',
},
scheduledStartTime: {
type: 'string',
description: 'Scheduled start time for upcoming live streams (ISO 8601)',
optional: true,
},
actualStartTime: {
type: 'string',
description: 'When the live stream actually started (ISO 8601)',
optional: true,
},
actualEndTime: {
type: 'string',
description: 'When the live stream ended (ISO 8601)',
optional: true,
},
concurrentViewers: {
type: 'number',
description: 'Current number of viewers (only for active live streams)',
optional: true,
},
activeLiveChatId: {
type: 'string',
description: 'Live chat ID for the stream (only for active live streams)',
optional: true,
},
},

View File

@@ -104,6 +104,7 @@
"@react-email/components": "^0.0.34",
"@react-email/render": "2.0.0",
"@sim/logger": "workspace:*",
"@socket.io/redis-adapter": "8.3.0",
"@t3-oss/env-nextjs": "0.13.4",
"@tanstack/react-query": "5.90.8",
"@tanstack/react-query-devtools": "5.90.2",
@@ -174,6 +175,7 @@
"react-simple-code-editor": "^0.14.1",
"react-window": "2.2.3",
"reactflow": "^11.11.4",
"redis": "5.10.0",
"rehype-autolink-headings": "^7.1.0",
"rehype-slug": "^6.0.0",
"remark-gfm": "4.0.1",
@@ -1146,6 +1148,16 @@
"@reactflow/node-toolbar": ["@reactflow/node-toolbar@1.3.14", "", { "dependencies": { "@reactflow/core": "11.11.4", "classcat": "^5.0.3", "zustand": "^4.4.1" }, "peerDependencies": { "react": ">=17", "react-dom": ">=17" } }, "sha512-rbynXQnH/xFNu4P9H+hVqlEUafDCkEoCy0Dg9mG22Sg+rY/0ck6KkrAQrYrTgXusd+cEJOMK0uOOFCK2/5rSGQ=="],
"@redis/bloom": ["@redis/bloom@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-doIF37ob+l47n0rkpRNgU8n4iacBlKM9xLiP1LtTZTvz8TloJB8qx/MgvhMhKdYG+CvCY2aPBnN2706izFn/4A=="],
"@redis/client": ["@redis/client@5.10.0", "", { "dependencies": { "cluster-key-slot": "1.1.2" } }, "sha512-JXmM4XCoso6C75Mr3lhKA3eNxSzkYi3nCzxDIKY+YOszYsJjuKbFgVtguVPbLMOttN4iu2fXoc2BGhdnYhIOxA=="],
"@redis/json": ["@redis/json@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-B2G8XlOmTPUuZtD44EMGbtoepQG34RCDXLZbjrtON1Djet0t5Ri7/YPXvL9aomXqP8lLTreaprtyLKF4tmXEEA=="],
"@redis/search": ["@redis/search@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-3SVcPswoSfp2HnmWbAGUzlbUPn7fOohVu2weUQ0S+EMiQi8jwjL+aN2p6V3TI65eNfVsJ8vyPvqWklm6H6esmg=="],
"@redis/time-series": ["@redis/time-series@5.10.0", "", { "peerDependencies": { "@redis/client": "^5.10.0" } }, "sha512-cPkpddXH5kc/SdRhF0YG0qtjL+noqFT0AcHbQ6axhsPsO7iqPi1cjxgdkE9TNeKiBUUdCaU1DbqkR/LzbzPBhg=="],
"@resvg/resvg-wasm": ["@resvg/resvg-wasm@2.4.0", "", {}, "sha512-C7c51Nn4yTxXFKvgh2txJFNweaVcfUPQxwEUFw4aWsCmfiBDJsTSwviIF8EcwjQ6k8bPyMWCl1vw4BdxE569Cg=="],
"@rolldown/pluginutils": ["@rolldown/pluginutils@1.0.0-beta.27", "", {}, "sha512-+d0F4MKMCbeVUJwG96uQ4SgAznZNSq93I3V+9NHA4OpvqG8mRCpGdKmK8l/dl02h2CCDHwW2FqilnTyDcAnqjA=="],
@@ -1340,6 +1352,8 @@
"@socket.io/component-emitter": ["@socket.io/component-emitter@3.1.2", "", {}, "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA=="],
"@socket.io/redis-adapter": ["@socket.io/redis-adapter@8.3.0", "", { "dependencies": { "debug": "~4.3.1", "notepack.io": "~3.0.1", "uid2": "1.0.0" }, "peerDependencies": { "socket.io-adapter": "^2.5.4" } }, "sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA=="],
"@standard-schema/spec": ["@standard-schema/spec@1.1.0", "", {}, "sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w=="],
"@standard-schema/utils": ["@standard-schema/utils@0.3.0", "", {}, "sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g=="],
@@ -2802,6 +2816,8 @@
"normalize-range": ["normalize-range@0.1.2", "", {}, "sha512-bdok/XvKII3nUpklnV6P2hxtMNrCboOjAcyBuQnWEhO665FwrSNRxU+AqpsyvO6LgGYPspN+lu5CLtw4jPRKNA=="],
"notepack.io": ["notepack.io@3.0.1", "", {}, "sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg=="],
"npm-run-path": ["npm-run-path@5.3.0", "", { "dependencies": { "path-key": "^4.0.0" } }, "sha512-ppwTtiJZq0O/ai0z7yfudtBpWIoxM8yE6nHi1X47eFR2EWORqfbu6CnPlNsjeN683eT0qG6H/Pyf9fCcvjnnnQ=="],
"npm-to-yarn": ["npm-to-yarn@3.0.1", "", {}, "sha512-tt6PvKu4WyzPwWUzy/hvPFqn+uwXO0K1ZHka8az3NnrhWJDmSqI8ncWq0fkL0k/lmmi5tAC11FXwXuh0rFbt1A=="],
@@ -3072,6 +3088,8 @@
"redent": ["redent@3.0.0", "", { "dependencies": { "indent-string": "^4.0.0", "strip-indent": "^3.0.0" } }, "sha512-6tDA8g98We0zd0GvVeMT9arEOnTw9qM03L9cJXaCjrip1OO764RDBLBfrB4cwzNGDj5OA5ioymC9GkizgWJDUg=="],
"redis": ["redis@5.10.0", "", { "dependencies": { "@redis/bloom": "5.10.0", "@redis/client": "5.10.0", "@redis/json": "5.10.0", "@redis/search": "5.10.0", "@redis/time-series": "5.10.0" } }, "sha512-0/Y+7IEiTgVGPrLFKy8oAEArSyEJkU0zvgV5xyi9NzNQ+SLZmyFbUsWIbgPcd4UdUh00opXGKlXJwMmsis5Byw=="],
"redis-errors": ["redis-errors@1.2.0", "", {}, "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w=="],
"redis-parser": ["redis-parser@3.0.0", "", { "dependencies": { "redis-errors": "^1.0.0" } }, "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A=="],
@@ -3434,6 +3452,8 @@
"ufo": ["ufo@1.6.3", "", {}, "sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q=="],
"uid2": ["uid2@1.0.0", "", {}, "sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ=="],
"ulid": ["ulid@2.4.0", "", { "bin": { "ulid": "bin/cli.js" } }, "sha512-fIRiVTJNcSRmXKPZtGzFQv9WRrZ3M9eoptl/teFJvjOzmpU+/K/JH6HZ8deBfb5vMEpicJcLn7JmvdknlMq7Zg=="],
"unbzip2-stream": ["unbzip2-stream@1.4.3", "", { "dependencies": { "buffer": "^5.2.1", "through": "^2.3.8" } }, "sha512-mlExGW4w71ebDJviH16lQLtZS32VKqsSfk80GCfUlwT/4/hNRFsoscrF/c++9xinkMzECL1uL9DDwXqFWkruPg=="],
@@ -3852,6 +3872,8 @@
"@shuding/opentype.js/fflate": ["fflate@0.7.4", "", {}, "sha512-5u2V/CDW15QM1XbbgS+0DfPxVB+jUKhWEKuuFuHncbk3tEEqzmoXL+2KyOFuKGqOnmdIy0/davWF1CkuwtibCw=="],
"@socket.io/redis-adapter/debug": ["debug@4.3.7", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ=="],
"@tailwindcss/node/jiti": ["jiti@2.6.1", "", { "bin": { "jiti": "lib/jiti-cli.mjs" } }, "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ=="],
"@tailwindcss/oxide-wasm32-wasi/@emnapi/core": ["@emnapi/core@1.8.1", "", { "dependencies": { "@emnapi/wasi-threads": "1.1.0", "tslib": "^2.4.0" }, "bundled": true }, "sha512-AvT9QFpxK0Zd8J0jopedNm+w/2fIzvtPKPjqyw9jwvBaReTTqPBk9Hixaz7KbjimP+QNz605/XnjFcDAL2pqBg=="],

View File

@@ -44,6 +44,10 @@ spec:
env:
- name: DATABASE_URL
value: {{ include "sim.databaseUrl" . | quote }}
{{- if .Values.app.env.REDIS_URL }}
- name: REDIS_URL
value: {{ .Values.app.env.REDIS_URL | quote }}
{{- end }}
{{- range $key, $value := .Values.realtime.env }}
- name: {{ $key }}
value: {{ $value | quote }}

View File

@@ -0,0 +1,8 @@
ALTER TABLE "workflow_execution_logs" DROP CONSTRAINT "workflow_execution_logs_workflow_id_workflow_id_fk";
--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" DROP CONSTRAINT "workflow_execution_snapshots_workflow_id_workflow_id_fk";
--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ALTER COLUMN "workflow_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" ALTER COLUMN "workflow_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" ADD CONSTRAINT "workflow_execution_snapshots_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;

File diff suppressed because it is too large Load Diff

View File

@@ -1037,6 +1037,13 @@
"when": 1769626313827,
"tag": "0148_aberrant_venom",
"breakpoints": true
},
{
"idx": 149,
"version": "7",
"when": 1769656977701,
"tag": "0149_next_cerise",
"breakpoints": true
}
]
}

View File

@@ -268,9 +268,7 @@ export const workflowExecutionSnapshots = pgTable(
'workflow_execution_snapshots',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
stateHash: text('state_hash').notNull(),
stateData: jsonb('state_data').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
@@ -290,9 +288,7 @@ export const workflowExecutionLogs = pgTable(
'workflow_execution_logs',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
workspaceId: text('workspace_id')
.notNull()
.references(() => workspace.id, { onDelete: 'cascade' }),