mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Compare commits
11 Commits
feat/crede
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8848780f56 | ||
|
|
fefeb010de | ||
|
|
ee6c7f98ff | ||
|
|
64758af2b6 | ||
|
|
8c09e19293 | ||
|
|
feb1c88d2f | ||
|
|
78007c11a0 | ||
|
|
bac1d5e588 | ||
|
|
7fdab14266 | ||
|
|
3b9e663f25 | ||
|
|
381bc1d556 |
10
README.md
10
README.md
@@ -74,10 +74,6 @@ docker compose -f docker-compose.prod.yml up -d
|
||||
|
||||
Open [http://localhost:3000](http://localhost:3000)
|
||||
|
||||
#### Background worker note
|
||||
|
||||
The Docker Compose stack starts a dedicated worker container by default. If `REDIS_URL` is not configured, the worker will start, log that it is idle, and do no queue processing. This is expected. Queue-backed API, webhook, and schedule execution requires Redis; installs without Redis continue to use the inline execution path.
|
||||
|
||||
Sim also supports local models via [Ollama](https://ollama.ai) and [vLLM](https://docs.vllm.ai/) — see the [Docker self-hosting docs](https://docs.sim.ai/self-hosting/docker) for setup details.
|
||||
|
||||
### Self-hosted: Manual Setup
|
||||
@@ -117,12 +113,10 @@ cd packages/db && bunx drizzle-kit migrate --config=./drizzle.config.ts
|
||||
5. Start development servers:
|
||||
|
||||
```bash
|
||||
bun run dev:full # Starts Next.js app, realtime socket server, and the BullMQ worker
|
||||
bun run dev:full # Starts Next.js app and realtime socket server
|
||||
```
|
||||
|
||||
If `REDIS_URL` is not configured, the worker will remain idle and execution continues inline.
|
||||
|
||||
Or run separately: `bun run dev` (Next.js), `cd apps/sim && bun run dev:sockets` (realtime), and `cd apps/sim && bun run worker` (BullMQ worker).
|
||||
Or run separately: `bun run dev` (Next.js) and `cd apps/sim && bun run dev:sockets` (realtime).
|
||||
|
||||
## Copilot API Keys
|
||||
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
---
|
||||
title: Credential
|
||||
---
|
||||
|
||||
import { Callout } from 'fumadocs-ui/components/callout'
|
||||
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
|
||||
import { Image } from '@/components/ui/image'
|
||||
import { FAQ } from '@/components/ui/faq'
|
||||
|
||||
The Credential block has two operations: **Select Credential** picks a single OAuth credential and outputs its ID reference for downstream blocks; **List Credentials** returns all OAuth credentials in the workspace (optionally filtered by provider) as an array for iteration.
|
||||
|
||||
<div className="flex justify-center">
|
||||
<Image
|
||||
src="/static/blocks/credential.png"
|
||||
alt="Credential Block"
|
||||
width={400}
|
||||
height={300}
|
||||
className="my-6"
|
||||
/>
|
||||
</div>
|
||||
|
||||
<Callout>
|
||||
The Credential block outputs credential **ID references**, not secrets. Downstream blocks receive the ID and resolve the actual OAuth token securely during their own execution.
|
||||
</Callout>
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### Operation
|
||||
|
||||
| Value | Description |
|
||||
|---|---|
|
||||
| **Select Credential** | Pick one OAuth credential and output its reference — use this to wire a single credential into downstream blocks |
|
||||
| **List Credentials** | Return all OAuth credentials in the workspace as an array — use this with a ForEach loop |
|
||||
|
||||
### Credential (Select operation)
|
||||
|
||||
Select an OAuth credential from your workspace. The dropdown shows all connected OAuth accounts (Google, GitHub, Slack, etc.).
|
||||
|
||||
In advanced mode, paste a credential ID directly. You can copy a credential ID from your workspace's Credentials settings page.
|
||||
|
||||
### Provider (List operation)
|
||||
|
||||
Filter the returned OAuth credentials by provider. Select one or more providers from the dropdown — only providers you have credentials for will appear. Leave empty to return all OAuth credentials.
|
||||
|
||||
| Example | Returns |
|
||||
|---|---|
|
||||
| Gmail | Gmail credentials only |
|
||||
| Slack | Slack credentials only |
|
||||
| Gmail + Slack | Gmail and Slack credentials |
|
||||
|
||||
## Outputs
|
||||
|
||||
<Tabs items={['Select Credential', 'List Credentials']}>
|
||||
<Tab>
|
||||
| Output | Type | Description |
|
||||
|---|---|---|
|
||||
| `credentialId` | `string` | The credential ID — pipe this into other blocks' credential fields |
|
||||
| `displayName` | `string` | Human-readable name (e.g. "waleed@company.com") |
|
||||
| `providerId` | `string` | OAuth provider ID (e.g. `google-email`, `slack`) |
|
||||
</Tab>
|
||||
<Tab>
|
||||
| Output | Type | Description |
|
||||
|---|---|---|
|
||||
| `credentials` | `json` | Array of OAuth credential objects (see shape below) |
|
||||
| `count` | `number` | Number of credentials returned |
|
||||
|
||||
Each object in the `credentials` array:
|
||||
|
||||
| Field | Type | Description |
|
||||
|---|---|---|
|
||||
| `credentialId` | `string` | The credential ID |
|
||||
| `displayName` | `string` | Human-readable name |
|
||||
| `providerId` | `string` | OAuth provider ID |
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
## Example Use Cases
|
||||
|
||||
**Shared credential across multiple blocks** — Define once, use everywhere
|
||||
```
|
||||
Credential (Select, Google) → Gmail (Send) & Google Drive (Upload) & Google Calendar (Create)
|
||||
```
|
||||
|
||||
**Multi-account workflows** — Route to different credentials based on logic
|
||||
```
|
||||
Agent (Determine account) → Condition → Credential A or Credential B → Slack (Post)
|
||||
```
|
||||
|
||||
**Iterate over all Gmail accounts**
|
||||
```
|
||||
Credential (List, Provider: Gmail) → ForEach Loop → Gmail (Send) using <loop.currentItem.credentialId>
|
||||
```
|
||||
|
||||
<div className="flex justify-center">
|
||||
<Image
|
||||
src="/static/blocks/credential-loop.png"
|
||||
alt="Credential List wired into a ForEach Loop"
|
||||
width={900}
|
||||
height={400}
|
||||
className="my-6"
|
||||
/>
|
||||
</div>
|
||||
|
||||
## How to wire a Credential block
|
||||
|
||||
### Select Credential
|
||||
|
||||
1. Drop a **Credential** block and select your OAuth credential from the picker
|
||||
2. In the downstream block, switch to **advanced mode** on its credential field
|
||||
3. Enter `<credentialBlockName.credentialId>` as the value
|
||||
|
||||
<Tabs items={['Gmail', 'Slack']}>
|
||||
<Tab>
|
||||
In the Gmail block's credential field (advanced mode):
|
||||
```
|
||||
<myCredential.credentialId>
|
||||
```
|
||||
</Tab>
|
||||
<Tab>
|
||||
In the Slack block's credential field (advanced mode):
|
||||
```
|
||||
<myCredential.credentialId>
|
||||
```
|
||||
</Tab>
|
||||
</Tabs>
|
||||
|
||||
### List Credentials
|
||||
|
||||
1. Drop a **Credential** block, set Operation to **List Credentials**
|
||||
2. Optionally select one or more **Providers** to narrow results (only your connected providers appear)
|
||||
3. Wire `<credentialBlockName.credentials>` into a **ForEach Loop** as the items source
|
||||
4. Inside the loop, reference `<loop.currentItem.credentialId>` in downstream blocks' credential fields
|
||||
|
||||
## Best Practices
|
||||
|
||||
- **Define once, reference many times**: When five blocks use the same Google account, use one Credential block and wire all five to `<credential.credentialId>` instead of selecting the account five times
|
||||
- **Outputs are safe to log**: The `credentialId` output is a UUID reference, not a secret. It is safe to inspect in execution logs
|
||||
- **Use for environment switching**: Pair with a Condition block to route to a production or staging OAuth credential based on a workflow variable
|
||||
- **Advanced mode is required**: Downstream blocks must be in advanced mode on their credential field to accept a dynamic reference
|
||||
- **Use List + ForEach for fan-out**: When you need to run the same action across all accounts of a provider, List Credentials feeds naturally into a ForEach loop
|
||||
- **Narrow by provider**: Use the Provider multiselect to filter to specific services — only providers you have credentials for are shown
|
||||
|
||||
<FAQ items={[
|
||||
{ question: "Does the Credential block expose my secret or token?", answer: "No. The block outputs a credential ID (a UUID), not the actual OAuth token. Downstream blocks receive the ID and resolve the token securely in their own execution context. Secrets never appear in workflow state, logs, or the canvas." },
|
||||
{ question: "What credential types does it support?", answer: "OAuth connected accounts only (Google, GitHub, Slack, etc.). Environment variables and service accounts cannot be resolved by ID in downstream blocks, so they are not supported." },
|
||||
{ question: "How is Select different from just copying a credential ID into advanced mode?", answer: "Functionally identical — both pass the same credential ID to the downstream block. The Credential block adds value when you need to use one credential in many blocks (change it once), or when you want to select between credentials dynamically using a Condition block." },
|
||||
{ question: "Can I list all OAuth credentials in my workspace?", answer: "Yes. Set the Operation to 'List Credentials'. Optionally filter by provider using the Provider multiselect. Wire the credentials output into a ForEach loop to process each credential individually." },
|
||||
{ question: "Can I use a Credential block output in a Function block?", answer: "Yes. Reference <credential.credentialId> in your Function block's code. Note that the function will receive the raw UUID string — if you need the resolved token, the downstream block must handle the resolution (as integration blocks do). The Function block does not automatically resolve credential IDs." },
|
||||
{ question: "What happens if the credential is deleted?", answer: "The Select operation will throw an error at execution time: 'Credential not found'. The List operation will simply omit the deleted credential from the results. Update the Credential block to select a valid credential before re-running." },
|
||||
]} />
|
||||
@@ -4,7 +4,6 @@
|
||||
"agent",
|
||||
"api",
|
||||
"condition",
|
||||
"credential",
|
||||
"evaluator",
|
||||
"function",
|
||||
"guardrails",
|
||||
|
||||
@@ -195,17 +195,6 @@ By default, your usage is capped at the credits included in your plan. To allow
|
||||
|
||||
Max (individual) shares the same rate limits as team plans. Team plans (Pro or Max for Teams) use the Max-tier rate limits.
|
||||
|
||||
### Concurrent Execution Limits
|
||||
|
||||
| Plan | Concurrent Executions |
|
||||
|------|----------------------|
|
||||
| **Free** | 5 |
|
||||
| **Pro** | 50 |
|
||||
| **Max / Team** | 200 |
|
||||
| **Enterprise** | 200 (customizable) |
|
||||
|
||||
Concurrent execution limits control how many workflow executions can run simultaneously within a workspace. When the limit is reached, new executions are queued and admitted as running executions complete. Manual runs from the editor are not subject to these limits.
|
||||
|
||||
### File Storage
|
||||
|
||||
| Plan | Storage |
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 63 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 12 KiB |
@@ -1,18 +1,16 @@
|
||||
'use client'
|
||||
|
||||
import { Suspense, useEffect, useMemo, useRef, useState } from 'react'
|
||||
import { Suspense, useMemo, useRef, useState } from 'react'
|
||||
import { Turnstile, type TurnstileInstance } from '@marsidev/react-turnstile'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { Eye, EyeOff, Loader2 } from 'lucide-react'
|
||||
import Link from 'next/link'
|
||||
import { useRouter, useSearchParams } from 'next/navigation'
|
||||
import { usePostHog } from 'posthog-js/react'
|
||||
import { Input, Label } from '@/components/emcn'
|
||||
import { client, useSession } from '@/lib/auth/auth-client'
|
||||
import { getEnv, isFalsy, isTruthy } from '@/lib/core/config/env'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { quickValidateEmail } from '@/lib/messaging/email/validation'
|
||||
import { captureEvent } from '@/lib/posthog/client'
|
||||
import { AUTH_SUBMIT_BTN } from '@/app/(auth)/components/auth-button-classes'
|
||||
import { SocialLoginButtons } from '@/app/(auth)/components/social-login-buttons'
|
||||
import { SSOLoginButton } from '@/app/(auth)/components/sso-login-button'
|
||||
@@ -83,12 +81,7 @@ function SignupFormContent({
|
||||
const router = useRouter()
|
||||
const searchParams = useSearchParams()
|
||||
const { refetch: refetchSession } = useSession()
|
||||
const posthog = usePostHog()
|
||||
const [isLoading, setIsLoading] = useState(false)
|
||||
|
||||
useEffect(() => {
|
||||
captureEvent(posthog, 'signup_page_viewed', {})
|
||||
}, [posthog])
|
||||
const [showPassword, setShowPassword] = useState(false)
|
||||
const [password, setPassword] = useState('')
|
||||
const [passwordErrors, setPasswordErrors] = useState<string[]>([])
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
'use client'
|
||||
|
||||
import { useEffect } from 'react'
|
||||
import { usePostHog } from 'posthog-js/react'
|
||||
import { captureEvent } from '@/lib/posthog/client'
|
||||
|
||||
export function LandingAnalytics() {
|
||||
const posthog = usePostHog()
|
||||
|
||||
useEffect(() => {
|
||||
captureEvent(posthog, 'landing_page_viewed', {})
|
||||
}, [posthog])
|
||||
|
||||
return null
|
||||
}
|
||||
@@ -13,7 +13,6 @@ import {
|
||||
Templates,
|
||||
Testimonials,
|
||||
} from '@/app/(home)/components'
|
||||
import { LandingAnalytics } from '@/app/(home)/landing-analytics'
|
||||
|
||||
/**
|
||||
* Landing page root component.
|
||||
@@ -46,7 +45,6 @@ export default async function Landing() {
|
||||
>
|
||||
Skip to main content
|
||||
</a>
|
||||
<LandingAnalytics />
|
||||
<StructuredData />
|
||||
<header>
|
||||
<Navbar blogPosts={blogPosts} />
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
* @see stores/constants.ts for the source of truth
|
||||
*/
|
||||
:root {
|
||||
--sidebar-width: 0px; /* 0 outside workspace; blocking script always sets actual value on workspace pages */
|
||||
--sidebar-width: 248px; /* SIDEBAR_WIDTH.DEFAULT */
|
||||
--panel-width: 320px; /* PANEL_WIDTH.DEFAULT */
|
||||
--toolbar-triggers-height: 300px; /* TOOLBAR_TRIGGERS_HEIGHT.DEFAULT */
|
||||
--editor-connections-height: 172px; /* EDITOR_CONNECTIONS_HEIGHT.DEFAULT */
|
||||
|
||||
@@ -7,7 +7,6 @@ import { generateAgentCard, generateSkillsFromWorkflow } from '@/lib/a2a/agent-c
|
||||
import type { AgentCapabilities, AgentSkill } from '@/lib/a2a/types'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
@@ -181,17 +180,6 @@ export async function DELETE(request: NextRequest, { params }: { params: Promise
|
||||
|
||||
logger.info(`Deleted A2A agent: ${agentId}`)
|
||||
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'a2a_agent_deleted',
|
||||
{
|
||||
agent_id: agentId,
|
||||
workflow_id: existingAgent.workflowId,
|
||||
workspace_id: existingAgent.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: existingAgent.workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
logger.error('Error deleting agent:', error)
|
||||
@@ -263,16 +251,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
|
||||
}
|
||||
|
||||
logger.info(`Published A2A agent: ${agentId}`)
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'a2a_agent_published',
|
||||
{
|
||||
agent_id: agentId,
|
||||
workflow_id: existingAgent.workflowId,
|
||||
workspace_id: existingAgent.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: existingAgent.workspaceId } }
|
||||
)
|
||||
return NextResponse.json({ success: true, isPublished: true })
|
||||
}
|
||||
|
||||
@@ -295,16 +273,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
|
||||
}
|
||||
|
||||
logger.info(`Unpublished A2A agent: ${agentId}`)
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'a2a_agent_unpublished',
|
||||
{
|
||||
agent_id: agentId,
|
||||
workflow_id: existingAgent.workflowId,
|
||||
workspace_id: existingAgent.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: existingAgent.workspaceId } }
|
||||
)
|
||||
return NextResponse.json({ success: true, isPublished: false })
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import { generateSkillsFromWorkflow } from '@/lib/a2a/agent-card'
|
||||
import { A2A_DEFAULT_CAPABILITIES } from '@/lib/a2a/constants'
|
||||
import { sanitizeAgentName } from '@/lib/a2a/utils'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { hasValidStartBlockInState } from '@/lib/workflows/triggers/trigger-utils'
|
||||
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
@@ -202,16 +201,6 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
logger.info(`Created A2A agent ${agentId} for workflow ${workflowId}`)
|
||||
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'a2a_agent_created',
|
||||
{ agent_id: agentId, workflow_id: workflowId, workspace_id: workspaceId },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_a2a_agent_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true, agent }, { status: 201 })
|
||||
} catch (error) {
|
||||
logger.error('Error creating agent:', error)
|
||||
|
||||
@@ -17,7 +17,6 @@ import {
|
||||
hasUsableSubscriptionStatus,
|
||||
} from '@/lib/billing/subscriptions/utils'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
|
||||
const logger = createLogger('SwitchPlan')
|
||||
|
||||
@@ -174,13 +173,6 @@ export async function POST(request: NextRequest) {
|
||||
interval: targetInterval,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'subscription_changed',
|
||||
{ from_plan: sub.plan ?? 'unknown', to_plan: targetPlanName, interval: targetInterval },
|
||||
{ set: { plan: targetPlanName } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true, plan: targetPlanName, interval: targetInterval })
|
||||
} catch (error) {
|
||||
logger.error('Failed to switch subscription', {
|
||||
|
||||
@@ -4,7 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { recordUsage } from '@/lib/billing/core/usage-log'
|
||||
import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/utils'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/request/http'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/utils'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/request/http'
|
||||
|
||||
const logger = createLogger('CopilotApiKeysValidate')
|
||||
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { NextResponse } from 'next/server'
|
||||
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
|
||||
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming'
|
||||
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
|
||||
import { abortActiveStream } from '@/lib/copilot/request/session/abort'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('CopilotChatAbortAPI')
|
||||
const GO_EXPLICIT_ABORT_TIMEOUT_MS = 3000
|
||||
|
||||
export async function POST(request: Request) {
|
||||
@@ -15,7 +17,12 @@ export async function POST(request: Request) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const body = await request.json().catch(() => ({}))
|
||||
const body = await request.json().catch((err) => {
|
||||
logger.warn('Abort request body parse failed; continuing with empty object', {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return {}
|
||||
})
|
||||
const streamId = typeof body.streamId === 'string' ? body.streamId : ''
|
||||
let chatId = typeof body.chatId === 'string' ? body.chatId : ''
|
||||
|
||||
@@ -24,7 +31,13 @@ export async function POST(request: Request) {
|
||||
}
|
||||
|
||||
if (!chatId) {
|
||||
const run = await getLatestRunForStream(streamId, authenticatedUserId).catch(() => null)
|
||||
const run = await getLatestRunForStream(streamId, authenticatedUserId).catch((err) => {
|
||||
logger.warn('getLatestRunForStream failed while resolving chatId for abort', {
|
||||
streamId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
})
|
||||
if (run?.chatId) {
|
||||
chatId = run.chatId
|
||||
}
|
||||
@@ -50,15 +63,13 @@ export async function POST(request: Request) {
|
||||
if (!response.ok) {
|
||||
throw new Error(`Explicit abort marker request failed: ${response.status}`)
|
||||
}
|
||||
} catch {
|
||||
// best effort: local abort should still proceed even if Go marker fails
|
||||
} catch (err) {
|
||||
logger.warn('Explicit abort marker request failed; proceeding with local abort', {
|
||||
streamId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
}
|
||||
|
||||
const aborted = await abortActiveStream(streamId)
|
||||
if (chatId) {
|
||||
await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch(
|
||||
() => false
|
||||
)
|
||||
}
|
||||
return NextResponse.json({ aborted })
|
||||
}
|
||||
|
||||
@@ -36,11 +36,11 @@ vi.mock('drizzle-orm', () => ({
|
||||
eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/chat-lifecycle', () => ({
|
||||
vi.mock('@/lib/copilot/chat/lifecycle', () => ({
|
||||
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/task-events', () => ({
|
||||
vi.mock('@/lib/copilot/tasks', () => ({
|
||||
taskPubSub: { publishStatusChanged: vi.fn() },
|
||||
}))
|
||||
|
||||
|
||||
@@ -5,8 +5,8 @@ import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
|
||||
const logger = createLogger('DeleteChatAPI')
|
||||
|
||||
|
||||
119
apps/sim/app/api/copilot/chat/queries.ts
Normal file
119
apps/sim/app/api/copilot/chat/queries.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, desc, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('CopilotChatAPI')
|
||||
|
||||
function transformChat(chat: {
|
||||
id: string
|
||||
title: string | null
|
||||
model: string | null
|
||||
messages: unknown
|
||||
planArtifact?: unknown
|
||||
config?: unknown
|
||||
conversationId?: string | null
|
||||
resources?: unknown
|
||||
createdAt: Date | null
|
||||
updatedAt: Date | null
|
||||
}) {
|
||||
return {
|
||||
id: chat.id,
|
||||
title: chat.title,
|
||||
model: chat.model,
|
||||
messages: Array.isArray(chat.messages) ? chat.messages : [],
|
||||
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
|
||||
planArtifact: chat.planArtifact || null,
|
||||
config: chat.config || null,
|
||||
...('conversationId' in chat ? { activeStreamId: chat.conversationId || null } : {}),
|
||||
...('resources' in chat
|
||||
? { resources: Array.isArray(chat.resources) ? chat.resources : [] }
|
||||
: {}),
|
||||
createdAt: chat.createdAt,
|
||||
updatedAt: chat.updatedAt,
|
||||
}
|
||||
}
|
||||
|
||||
export async function GET(req: NextRequest) {
|
||||
try {
|
||||
const { searchParams } = new URL(req.url)
|
||||
const workflowId = searchParams.get('workflowId')
|
||||
const workspaceId = searchParams.get('workspaceId')
|
||||
const chatId = searchParams.get('chatId')
|
||||
|
||||
const { userId: authenticatedUserId, isAuthenticated } =
|
||||
await authenticateCopilotRequestSessionOnly()
|
||||
if (!isAuthenticated || !authenticatedUserId) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
|
||||
if (chatId) {
|
||||
const chat = await getAccessibleCopilotChat(chatId, authenticatedUserId)
|
||||
if (!chat) {
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
logger.info(`Retrieved chat ${chatId}`)
|
||||
return NextResponse.json({ success: true, chat: transformChat(chat) })
|
||||
}
|
||||
|
||||
if (!workflowId && !workspaceId) {
|
||||
return createBadRequestResponse('workflowId, workspaceId, or chatId is required')
|
||||
}
|
||||
|
||||
if (workspaceId) {
|
||||
await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId)
|
||||
}
|
||||
|
||||
if (workflowId) {
|
||||
const authorization = await authorizeWorkflowByWorkspacePermission({
|
||||
workflowId,
|
||||
userId: authenticatedUserId,
|
||||
action: 'read',
|
||||
})
|
||||
if (!authorization.allowed) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
}
|
||||
|
||||
const scopeFilter = workflowId
|
||||
? eq(copilotChats.workflowId, workflowId)
|
||||
: eq(copilotChats.workspaceId, workspaceId!)
|
||||
|
||||
const chats = await db
|
||||
.select({
|
||||
id: copilotChats.id,
|
||||
title: copilotChats.title,
|
||||
model: copilotChats.model,
|
||||
messages: copilotChats.messages,
|
||||
planArtifact: copilotChats.planArtifact,
|
||||
config: copilotChats.config,
|
||||
createdAt: copilotChats.createdAt,
|
||||
updatedAt: copilotChats.updatedAt,
|
||||
})
|
||||
.from(copilotChats)
|
||||
.where(and(eq(copilotChats.userId, authenticatedUserId), scopeFilter))
|
||||
.orderBy(desc(copilotChats.updatedAt))
|
||||
|
||||
const scope = workflowId ? `workflow ${workflowId}` : `workspace ${workspaceId}`
|
||||
logger.info(`Retrieved ${chats.length} chats for ${scope}`)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
chats: chats.map(transformChat),
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error fetching copilot chats:', error)
|
||||
return createInternalServerErrorResponse('Failed to fetch chats')
|
||||
}
|
||||
}
|
||||
65
apps/sim/app/api/copilot/chat/rename/route.ts
Normal file
65
apps/sim/app/api/copilot/chat/rename/route.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
|
||||
const logger = createLogger('RenameChatAPI')
|
||||
|
||||
const RenameChatSchema = z.object({
|
||||
chatId: z.string().min(1),
|
||||
title: z.string().min(1).max(200),
|
||||
})
|
||||
|
||||
export async function PATCH(request: NextRequest) {
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ success: false, error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const body = await request.json()
|
||||
const { chatId, title } = RenameChatSchema.parse(body)
|
||||
|
||||
const chat = await getAccessibleCopilotChat(chatId, session.user.id)
|
||||
if (!chat) {
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
const now = new Date()
|
||||
const [updated] = await db
|
||||
.update(copilotChats)
|
||||
.set({ title, updatedAt: now, lastSeenAt: now })
|
||||
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id)))
|
||||
.returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId })
|
||||
|
||||
if (!updated) {
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
logger.info('Chat renamed', { chatId, title })
|
||||
|
||||
if (updated.workspaceId) {
|
||||
taskPubSub?.publishStatusChanged({
|
||||
workspaceId: updated.workspaceId,
|
||||
chatId,
|
||||
type: 'renamed',
|
||||
})
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
if (error instanceof z.ZodError) {
|
||||
return NextResponse.json(
|
||||
{ success: false, error: 'Invalid request data', details: error.errors },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
logger.error('Error renaming chat:', error)
|
||||
return NextResponse.json({ success: false, error: 'Failed to rename chat' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -10,8 +10,8 @@ import {
|
||||
createInternalServerErrorResponse,
|
||||
createNotFoundResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import type { ChatResource, ResourceType } from '@/lib/copilot/resources'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import type { ChatResource, ResourceType } from '@/lib/copilot/resources/persistence'
|
||||
|
||||
const logger = createLogger('CopilotChatResourcesAPI')
|
||||
|
||||
|
||||
@@ -1,46 +1,45 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, desc, eq, sql } from 'drizzle-orm'
|
||||
import { eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
|
||||
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
|
||||
import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload'
|
||||
import {
|
||||
acquirePendingChatStream,
|
||||
createSSEStream,
|
||||
releasePendingChatStream,
|
||||
requestChatTitle,
|
||||
SSE_RESPONSE_HEADERS,
|
||||
} from '@/lib/copilot/chat-streaming'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
|
||||
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { resolveActiveResourceContext } from '@/lib/copilot/process-contents'
|
||||
buildPersistedAssistantMessage,
|
||||
buildPersistedUserMessage,
|
||||
} from '@/lib/copilot/chat/persisted-message'
|
||||
import {
|
||||
processContextsServer,
|
||||
resolveActiveResourceContext,
|
||||
} from '@/lib/copilot/chat/process-contents'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start'
|
||||
import {
|
||||
authorizeWorkflowByWorkspacePermission,
|
||||
resolveWorkflowIdForUser,
|
||||
} from '@/lib/workflows/utils'
|
||||
import {
|
||||
assertActiveWorkspaceAccess,
|
||||
getUserEntityPermissions,
|
||||
} from '@/lib/workspaces/permissions/utils'
|
||||
acquirePendingChatStream,
|
||||
getPendingChatStreamId,
|
||||
releasePendingChatStream,
|
||||
} from '@/lib/copilot/request/session'
|
||||
import type { OrchestratorResult } from '@/lib/copilot/request/types'
|
||||
import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
import type { ChatContext } from '@/stores/panel'
|
||||
|
||||
export const maxDuration = 3600
|
||||
|
||||
const logger = createLogger('CopilotChatAPI')
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Schemas
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const FileAttachmentSchema = z.object({
|
||||
id: z.string(),
|
||||
key: z.string(),
|
||||
@@ -67,7 +66,6 @@ const ChatMessageSchema = z.object({
|
||||
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
|
||||
prefetch: z.boolean().optional(),
|
||||
createNewChat: z.boolean().optional().default(false),
|
||||
stream: z.boolean().optional().default(true),
|
||||
implicitFeedback: z.string().optional(),
|
||||
fileAttachments: z.array(FileAttachmentSchema).optional(),
|
||||
resourceAttachments: z.array(ResourceAttachmentSchema).optional(),
|
||||
@@ -105,27 +103,25 @@ const ChatMessageSchema = z.object({
|
||||
userTimezone: z.string().optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* POST /api/copilot/chat
|
||||
* Send messages to sim agent and handle chat persistence
|
||||
*/
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /api/copilot/chat
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function POST(req: NextRequest) {
|
||||
const tracker = createRequestTracker()
|
||||
let actualChatId: string | undefined
|
||||
let pendingChatStreamAcquired = false
|
||||
let pendingChatStreamHandedOff = false
|
||||
let pendingChatStreamID: string | undefined
|
||||
let chatStreamLockAcquired = false
|
||||
let userMessageIdToUse = ''
|
||||
|
||||
try {
|
||||
// Get session to access user information including name
|
||||
// 1. Auth
|
||||
const session = await getSession()
|
||||
|
||||
if (!session?.user?.id) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
|
||||
const authenticatedUserId = session.user.id
|
||||
|
||||
// 2. Parse & validate
|
||||
const body = await req.json()
|
||||
const {
|
||||
message,
|
||||
@@ -138,7 +134,6 @@ export async function POST(req: NextRequest) {
|
||||
mode,
|
||||
prefetch,
|
||||
createNewChat,
|
||||
stream,
|
||||
implicitFeedback,
|
||||
fileAttachments,
|
||||
resourceAttachments,
|
||||
@@ -152,17 +147,12 @@ export async function POST(req: NextRequest) {
|
||||
? contexts.map((ctx) => {
|
||||
if (ctx.kind !== 'blocks') return ctx
|
||||
if (Array.isArray(ctx.blockIds) && ctx.blockIds.length > 0) return ctx
|
||||
if (ctx.blockId) {
|
||||
return {
|
||||
...ctx,
|
||||
blockIds: [ctx.blockId],
|
||||
}
|
||||
}
|
||||
if (ctx.blockId) return { ...ctx, blockIds: [ctx.blockId] }
|
||||
return ctx
|
||||
})
|
||||
: contexts
|
||||
|
||||
// Copilot route always requires a workflow scope
|
||||
// 3. Resolve workflow & workspace
|
||||
const resolved = await resolveWorkflowIdForUser(
|
||||
authenticatedUserId,
|
||||
providedWorkflowId,
|
||||
@@ -174,64 +164,29 @@ export async function POST(req: NextRequest) {
|
||||
'No workflows found. Create a workflow first or provide a valid workflowId.'
|
||||
)
|
||||
}
|
||||
const workflowId = resolved.workflowId
|
||||
const workflowResolvedName = resolved.workflowName
|
||||
const { workflowId, workflowName: workflowResolvedName } = resolved
|
||||
|
||||
// Resolve workspace from workflow so it can be sent as implicit context to the copilot.
|
||||
let resolvedWorkspaceId: string | undefined
|
||||
try {
|
||||
const { getWorkflowById } = await import('@/lib/workflows/utils')
|
||||
const wf = await getWorkflowById(workflowId)
|
||||
resolvedWorkspaceId = wf?.workspaceId ?? undefined
|
||||
} catch {
|
||||
logger
|
||||
.withMetadata({ requestId: tracker.requestId, messageId: userMessageId })
|
||||
.warn('Failed to resolve workspaceId from workflow')
|
||||
logger.warn(`[${tracker.requestId}] Failed to resolve workspaceId from workflow`)
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
authenticatedUserId,
|
||||
'copilot_chat_sent',
|
||||
{
|
||||
workflow_id: workflowId,
|
||||
workspace_id: resolvedWorkspaceId ?? '',
|
||||
has_file_attachments: Array.isArray(fileAttachments) && fileAttachments.length > 0,
|
||||
has_contexts: Array.isArray(contexts) && contexts.length > 0,
|
||||
mode,
|
||||
},
|
||||
{
|
||||
groups: resolvedWorkspaceId ? { workspace: resolvedWorkspaceId } : undefined,
|
||||
setOnce: { first_copilot_use_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
const userMessageIdToUse = userMessageId || crypto.randomUUID()
|
||||
const reqLogger = logger.withMetadata({
|
||||
requestId: tracker.requestId,
|
||||
messageId: userMessageIdToUse,
|
||||
})
|
||||
try {
|
||||
reqLogger.info('Received chat POST', {
|
||||
workflowId,
|
||||
hasContexts: Array.isArray(normalizedContexts),
|
||||
contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0,
|
||||
contextsPreview: Array.isArray(normalizedContexts)
|
||||
? normalizedContexts.map((c: any) => ({
|
||||
kind: c?.kind,
|
||||
chatId: c?.chatId,
|
||||
workflowId: c?.workflowId,
|
||||
executionId: (c as any)?.executionId,
|
||||
label: c?.label,
|
||||
}))
|
||||
: undefined,
|
||||
})
|
||||
} catch {}
|
||||
|
||||
let currentChat: any = null
|
||||
let conversationHistory: any[] = []
|
||||
actualChatId = chatId
|
||||
userMessageIdToUse = userMessageId || crypto.randomUUID()
|
||||
const selectedModel = model || 'claude-opus-4-6'
|
||||
|
||||
logger.info(`[${tracker.requestId}] Received chat POST`, {
|
||||
workflowId,
|
||||
contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0,
|
||||
})
|
||||
|
||||
// 4. Resolve or create chat
|
||||
let currentChat: ChatLoadResult['chat'] = null
|
||||
let conversationHistory: unknown[] = []
|
||||
actualChatId = chatId
|
||||
|
||||
if (chatId || createNewChat) {
|
||||
const chatResult = await resolveOrCreateChat({
|
||||
chatId,
|
||||
@@ -250,37 +205,48 @@ export async function POST(req: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
if (actualChatId) {
|
||||
chatStreamLockAcquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
|
||||
if (!chatStreamLockAcquired) {
|
||||
const activeStreamId = await getPendingChatStreamId(actualChatId)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'A response is already in progress for this chat.',
|
||||
...(activeStreamId ? { activeStreamId } : {}),
|
||||
},
|
||||
{ status: 409 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Process contexts
|
||||
let agentContexts: Array<{ type: string; content: string }> = []
|
||||
|
||||
if (Array.isArray(normalizedContexts) && normalizedContexts.length > 0) {
|
||||
try {
|
||||
const { processContextsServer } = await import('@/lib/copilot/process-contents')
|
||||
const processed = await processContextsServer(
|
||||
normalizedContexts as any,
|
||||
normalizedContexts as ChatContext[],
|
||||
authenticatedUserId,
|
||||
message,
|
||||
resolvedWorkspaceId,
|
||||
actualChatId
|
||||
)
|
||||
agentContexts = processed
|
||||
reqLogger.info('Contexts processed for request', {
|
||||
logger.info(`[${tracker.requestId}] Contexts processed`, {
|
||||
processedCount: agentContexts.length,
|
||||
kinds: agentContexts.map((c) => c.type),
|
||||
lengthPreview: agentContexts.map((c) => c.content?.length ?? 0),
|
||||
})
|
||||
if (
|
||||
Array.isArray(normalizedContexts) &&
|
||||
normalizedContexts.length > 0 &&
|
||||
agentContexts.length === 0
|
||||
) {
|
||||
reqLogger.warn(
|
||||
'Contexts provided but none processed. Check executionId for logs contexts.'
|
||||
if (agentContexts.length === 0) {
|
||||
logger.warn(
|
||||
`[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.`
|
||||
)
|
||||
}
|
||||
} catch (e) {
|
||||
reqLogger.error('Failed to process contexts', e)
|
||||
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
|
||||
}
|
||||
}
|
||||
|
||||
// 5b. Process resource attachments
|
||||
if (
|
||||
Array.isArray(resourceAttachments) &&
|
||||
resourceAttachments.length > 0 &&
|
||||
@@ -296,26 +262,30 @@ export async function POST(req: NextRequest) {
|
||||
actualChatId
|
||||
)
|
||||
if (!ctx) return null
|
||||
return {
|
||||
...ctx,
|
||||
tag: r.active ? '@active_tab' : '@open_tab',
|
||||
}
|
||||
return { ...ctx, tag: r.active ? '@active_tab' : '@open_tab' }
|
||||
})
|
||||
)
|
||||
for (const result of results) {
|
||||
if (result.status === 'fulfilled' && result.value) {
|
||||
agentContexts.push(result.value)
|
||||
} else if (result.status === 'rejected') {
|
||||
reqLogger.error('Failed to resolve resource attachment', result.reason)
|
||||
logger.error(
|
||||
`[${tracker.requestId}] Failed to resolve resource attachment`,
|
||||
result.reason
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const effectiveMode = mode === 'agent' ? 'build' : mode
|
||||
|
||||
// 6. Build copilot request payload
|
||||
const userPermission = resolvedWorkspaceId
|
||||
? await getUserEntityPermissions(authenticatedUserId, 'workspace', resolvedWorkspaceId).catch(
|
||||
() => null
|
||||
(err) => {
|
||||
logger.warn('Failed to load user permissions', {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
}
|
||||
)
|
||||
: null
|
||||
|
||||
@@ -339,55 +309,24 @@ export async function POST(req: NextRequest) {
|
||||
userPermission: userPermission ?? undefined,
|
||||
userTimezone,
|
||||
},
|
||||
{
|
||||
selectedModel,
|
||||
}
|
||||
{ selectedModel }
|
||||
)
|
||||
|
||||
try {
|
||||
reqLogger.info('About to call Sim Agent', {
|
||||
hasContext: agentContexts.length > 0,
|
||||
contextCount: agentContexts.length,
|
||||
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
|
||||
messageLength: message.length,
|
||||
mode: effectiveMode,
|
||||
hasTools: Array.isArray(requestPayload.tools),
|
||||
toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0,
|
||||
hasBaseTools: Array.isArray(requestPayload.baseTools),
|
||||
baseToolCount: Array.isArray(requestPayload.baseTools)
|
||||
? requestPayload.baseTools.length
|
||||
: 0,
|
||||
hasCredentials: !!requestPayload.credentials,
|
||||
})
|
||||
} catch {}
|
||||
|
||||
if (stream && actualChatId) {
|
||||
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
|
||||
if (!acquired) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error:
|
||||
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
|
||||
},
|
||||
{ status: 409 }
|
||||
)
|
||||
}
|
||||
pendingChatStreamAcquired = true
|
||||
pendingChatStreamID = userMessageIdToUse
|
||||
}
|
||||
logger.info(`[${tracker.requestId}] About to call Sim Agent`, {
|
||||
contextCount: agentContexts.length,
|
||||
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
|
||||
messageLength: message.length,
|
||||
mode,
|
||||
})
|
||||
|
||||
// 7. Persist user message
|
||||
if (actualChatId) {
|
||||
const userMsg = {
|
||||
const userMsg = buildPersistedUserMessage({
|
||||
id: userMessageIdToUse,
|
||||
role: 'user' as const,
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
||||
...(Array.isArray(normalizedContexts) &&
|
||||
normalizedContexts.length > 0 && {
|
||||
contexts: normalizedContexts,
|
||||
}),
|
||||
}
|
||||
fileAttachments,
|
||||
contexts: normalizedContexts,
|
||||
})
|
||||
|
||||
const [updated] = await db
|
||||
.update(copilotChats)
|
||||
@@ -400,268 +339,66 @@ export async function POST(req: NextRequest) {
|
||||
.returning({ messages: copilotChats.messages })
|
||||
|
||||
if (updated) {
|
||||
const freshMessages: any[] = Array.isArray(updated.messages) ? updated.messages : []
|
||||
conversationHistory = freshMessages.filter((m: any) => m.id !== userMessageIdToUse)
|
||||
const freshMessages: Record<string, unknown>[] = Array.isArray(updated.messages)
|
||||
? updated.messages
|
||||
: []
|
||||
conversationHistory = freshMessages.filter(
|
||||
(m: Record<string, unknown>) => m.id !== userMessageIdToUse
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (stream) {
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
const sseStream = createSSEStream({
|
||||
requestPayload,
|
||||
userId: authenticatedUserId,
|
||||
streamId: userMessageIdToUse,
|
||||
executionId,
|
||||
runId,
|
||||
chatId: actualChatId,
|
||||
currentChat,
|
||||
isNewChat: conversationHistory.length === 0,
|
||||
message,
|
||||
titleModel: selectedModel,
|
||||
titleProvider: provider,
|
||||
requestId: tracker.requestId,
|
||||
workspaceId: resolvedWorkspaceId,
|
||||
pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream),
|
||||
orchestrateOptions: {
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
chatId: actualChatId,
|
||||
executionId,
|
||||
runId,
|
||||
goRoute: '/api/copilot',
|
||||
autoExecuteTools: true,
|
||||
interactive: true,
|
||||
onComplete: async (result: OrchestratorResult) => {
|
||||
if (!actualChatId) return
|
||||
if (!result.success) return
|
||||
// 8. Create SSE stream with onComplete for assistant message persistence
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
|
||||
const assistantMessage: Record<string, unknown> = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant' as const,
|
||||
content: result.content,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(result.requestId ? { requestId: result.requestId } : {}),
|
||||
}
|
||||
if (result.toolCalls.length > 0) {
|
||||
assistantMessage.toolCalls = result.toolCalls
|
||||
}
|
||||
if (result.contentBlocks.length > 0) {
|
||||
assistantMessage.contentBlocks = result.contentBlocks.map((block) => {
|
||||
const stored: Record<string, unknown> = { type: block.type }
|
||||
if (block.content) stored.content = block.content
|
||||
if (block.type === 'tool_call' && block.toolCall) {
|
||||
const state =
|
||||
block.toolCall.result?.success !== undefined
|
||||
? block.toolCall.result.success
|
||||
? 'success'
|
||||
: 'error'
|
||||
: block.toolCall.status
|
||||
const isSubagentTool = !!block.calledBy
|
||||
const isNonTerminal =
|
||||
state === 'cancelled' || state === 'pending' || state === 'executing'
|
||||
stored.toolCall = {
|
||||
id: block.toolCall.id,
|
||||
name: block.toolCall.name,
|
||||
state,
|
||||
...(isSubagentTool && isNonTerminal ? {} : { result: block.toolCall.result }),
|
||||
...(isSubagentTool && isNonTerminal
|
||||
? {}
|
||||
: block.toolCall.params
|
||||
? { params: block.toolCall.params }
|
||||
: {}),
|
||||
...(block.calledBy ? { calledBy: block.calledBy } : {}),
|
||||
}
|
||||
}
|
||||
return stored
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
const [row] = await db
|
||||
.select({ messages: copilotChats.messages })
|
||||
.from(copilotChats)
|
||||
.where(eq(copilotChats.id, actualChatId))
|
||||
.limit(1)
|
||||
|
||||
const msgs: any[] = Array.isArray(row?.messages) ? row.messages : []
|
||||
const userIdx = msgs.findIndex((m: any) => m.id === userMessageIdToUse)
|
||||
const alreadyHasResponse =
|
||||
userIdx >= 0 &&
|
||||
userIdx + 1 < msgs.length &&
|
||||
(msgs[userIdx + 1] as any)?.role === 'assistant'
|
||||
|
||||
if (!alreadyHasResponse) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
|
||||
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageIdToUse} THEN NULL ELSE ${copilotChats.conversationId} END`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId))
|
||||
}
|
||||
} catch (error) {
|
||||
reqLogger.error('Failed to persist chat messages', {
|
||||
chatId: actualChatId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
}
|
||||
},
|
||||
},
|
||||
})
|
||||
pendingChatStreamHandedOff = true
|
||||
|
||||
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
|
||||
}
|
||||
|
||||
const nsExecutionId = crypto.randomUUID()
|
||||
const nsRunId = crypto.randomUUID()
|
||||
|
||||
if (actualChatId) {
|
||||
await createRunSegment({
|
||||
id: nsRunId,
|
||||
executionId: nsExecutionId,
|
||||
chatId: actualChatId,
|
||||
const sseStream = createSSEStream({
|
||||
requestPayload,
|
||||
userId: authenticatedUserId,
|
||||
streamId: userMessageIdToUse,
|
||||
executionId,
|
||||
runId,
|
||||
chatId: actualChatId,
|
||||
currentChat,
|
||||
isNewChat: conversationHistory.length === 0,
|
||||
message,
|
||||
titleModel: selectedModel,
|
||||
titleProvider: provider,
|
||||
requestId: tracker.requestId,
|
||||
workspaceId: resolvedWorkspaceId,
|
||||
orchestrateOptions: {
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
streamId: userMessageIdToUse,
|
||||
}).catch(() => {})
|
||||
}
|
||||
|
||||
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
chatId: actualChatId,
|
||||
executionId: nsExecutionId,
|
||||
runId: nsRunId,
|
||||
goRoute: '/api/copilot',
|
||||
autoExecuteTools: true,
|
||||
interactive: true,
|
||||
})
|
||||
|
||||
const responseData = {
|
||||
content: nonStreamingResult.content,
|
||||
toolCalls: nonStreamingResult.toolCalls,
|
||||
model: selectedModel,
|
||||
provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined,
|
||||
}
|
||||
|
||||
reqLogger.info('Non-streaming response from orchestrator', {
|
||||
hasContent: !!responseData.content,
|
||||
contentLength: responseData.content?.length || 0,
|
||||
model: responseData.model,
|
||||
provider: responseData.provider,
|
||||
toolCallsCount: responseData.toolCalls?.length || 0,
|
||||
})
|
||||
|
||||
// Save messages if we have a chat
|
||||
if (currentChat && responseData.content) {
|
||||
const userMessage = {
|
||||
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
||||
role: 'user',
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
||||
...(Array.isArray(normalizedContexts) &&
|
||||
normalizedContexts.length > 0 && {
|
||||
contexts: normalizedContexts,
|
||||
}),
|
||||
...(Array.isArray(normalizedContexts) &&
|
||||
normalizedContexts.length > 0 && {
|
||||
contentBlocks: [
|
||||
{ type: 'contexts', contexts: normalizedContexts as any, timestamp: Date.now() },
|
||||
],
|
||||
}),
|
||||
}
|
||||
|
||||
const assistantMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant',
|
||||
content: responseData.content,
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
|
||||
const updatedMessages = [...conversationHistory, userMessage, assistantMessage]
|
||||
|
||||
// Start title generation in parallel if this is first message (non-streaming)
|
||||
if (actualChatId && !currentChat.title && conversationHistory.length === 0) {
|
||||
reqLogger.info('Starting title generation for non-streaming response')
|
||||
requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse })
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
title,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
reqLogger.info(`Generated and saved title: ${title}`)
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
reqLogger.error('Title generation failed', error)
|
||||
})
|
||||
}
|
||||
|
||||
// Update chat in database immediately (without blocking for title)
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
messages: updatedMessages,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
}
|
||||
|
||||
reqLogger.info('Returning non-streaming response', {
|
||||
duration: tracker.getDuration(),
|
||||
chatId: actualChatId,
|
||||
responseLength: responseData.content?.length || 0,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
response: responseData,
|
||||
chatId: actualChatId,
|
||||
metadata: {
|
||||
requestId: tracker.requestId,
|
||||
message,
|
||||
duration: tracker.getDuration(),
|
||||
chatId: actualChatId,
|
||||
executionId,
|
||||
runId,
|
||||
goRoute: '/api/copilot',
|
||||
autoExecuteTools: true,
|
||||
interactive: true,
|
||||
onComplete: buildOnComplete(actualChatId, userMessageIdToUse, tracker.requestId),
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
|
||||
} catch (error) {
|
||||
if (
|
||||
actualChatId &&
|
||||
pendingChatStreamAcquired &&
|
||||
!pendingChatStreamHandedOff &&
|
||||
pendingChatStreamID
|
||||
) {
|
||||
await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {})
|
||||
if (chatStreamLockAcquired && actualChatId && userMessageIdToUse) {
|
||||
await releasePendingChatStream(actualChatId, userMessageIdToUse)
|
||||
}
|
||||
const duration = tracker.getDuration()
|
||||
|
||||
if (error instanceof z.ZodError) {
|
||||
logger
|
||||
.withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined })
|
||||
.error('Validation error', {
|
||||
duration,
|
||||
errors: error.errors,
|
||||
})
|
||||
logger.error(`[${tracker.requestId}] Validation error:`, { duration, errors: error.errors })
|
||||
return NextResponse.json(
|
||||
{ error: 'Invalid request data', details: error.errors },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
logger
|
||||
.withMetadata({ requestId: tracker.requestId, messageId: pendingChatStreamID ?? undefined })
|
||||
.error('Error handling copilot chat', {
|
||||
duration,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
})
|
||||
logger.error(`[${tracker.requestId}] Error handling copilot chat:`, {
|
||||
duration,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Internal server error' },
|
||||
@@ -670,132 +407,55 @@ export async function POST(req: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
export async function GET(req: NextRequest) {
|
||||
try {
|
||||
const { searchParams } = new URL(req.url)
|
||||
const workflowId = searchParams.get('workflowId')
|
||||
const workspaceId = searchParams.get('workspaceId')
|
||||
const chatId = searchParams.get('chatId')
|
||||
// ---------------------------------------------------------------------------
|
||||
// onComplete: persist assistant message after streaming finishes
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const { userId: authenticatedUserId, isAuthenticated } =
|
||||
await authenticateCopilotRequestSessionOnly()
|
||||
if (!isAuthenticated || !authenticatedUserId) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
function buildOnComplete(
|
||||
chatId: string | undefined,
|
||||
userMessageId: string,
|
||||
requestId: string
|
||||
): (result: OrchestratorResult) => Promise<void> {
|
||||
return async (result) => {
|
||||
if (!chatId || !result.success) return
|
||||
|
||||
if (chatId) {
|
||||
const chat = await getAccessibleCopilotChat(chatId, authenticatedUserId)
|
||||
const assistantMessage = buildPersistedAssistantMessage(result, result.requestId)
|
||||
|
||||
if (!chat) {
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
try {
|
||||
const [row] = await db
|
||||
.select({ messages: copilotChats.messages })
|
||||
.from(copilotChats)
|
||||
.where(eq(copilotChats.id, chatId))
|
||||
.limit(1)
|
||||
|
||||
const msgs: Record<string, unknown>[] = Array.isArray(row?.messages) ? row.messages : []
|
||||
const userIdx = msgs.findIndex((m: Record<string, unknown>) => m.id === userMessageId)
|
||||
const alreadyHasResponse =
|
||||
userIdx >= 0 &&
|
||||
userIdx + 1 < msgs.length &&
|
||||
(msgs[userIdx + 1] as Record<string, unknown>)?.role === 'assistant'
|
||||
|
||||
if (!alreadyHasResponse) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`,
|
||||
conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, chatId))
|
||||
}
|
||||
|
||||
let streamSnapshot: {
|
||||
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
|
||||
status: string
|
||||
} | null = null
|
||||
|
||||
if (chat.conversationId) {
|
||||
try {
|
||||
const [meta, events] = await Promise.all([
|
||||
getStreamMeta(chat.conversationId),
|
||||
readStreamEvents(chat.conversationId, 0),
|
||||
])
|
||||
streamSnapshot = {
|
||||
events: events || [],
|
||||
status: meta?.status || 'unknown',
|
||||
}
|
||||
} catch (err) {
|
||||
logger
|
||||
.withMetadata({ messageId: chat.conversationId || undefined })
|
||||
.warn('Failed to read stream snapshot for chat', {
|
||||
chatId,
|
||||
conversationId: chat.conversationId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const transformedChat = {
|
||||
id: chat.id,
|
||||
title: chat.title,
|
||||
model: chat.model,
|
||||
messages: Array.isArray(chat.messages) ? chat.messages : [],
|
||||
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
|
||||
planArtifact: chat.planArtifact || null,
|
||||
config: chat.config || null,
|
||||
conversationId: chat.conversationId || null,
|
||||
resources: Array.isArray(chat.resources) ? chat.resources : [],
|
||||
createdAt: chat.createdAt,
|
||||
updatedAt: chat.updatedAt,
|
||||
...(streamSnapshot ? { streamSnapshot } : {}),
|
||||
}
|
||||
|
||||
logger
|
||||
.withMetadata({ messageId: chat.conversationId || undefined })
|
||||
.info(`Retrieved chat ${chatId}`)
|
||||
return NextResponse.json({ success: true, chat: transformedChat })
|
||||
}
|
||||
|
||||
if (!workflowId && !workspaceId) {
|
||||
return createBadRequestResponse('workflowId, workspaceId, or chatId is required')
|
||||
}
|
||||
|
||||
if (workspaceId) {
|
||||
await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId)
|
||||
}
|
||||
|
||||
if (workflowId) {
|
||||
const authorization = await authorizeWorkflowByWorkspacePermission({
|
||||
workflowId,
|
||||
userId: authenticatedUserId,
|
||||
action: 'read',
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to persist chat messages`, {
|
||||
chatId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
if (!authorization.allowed) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
}
|
||||
|
||||
const scopeFilter = workflowId
|
||||
? eq(copilotChats.workflowId, workflowId)
|
||||
: eq(copilotChats.workspaceId, workspaceId!)
|
||||
|
||||
const chats = await db
|
||||
.select({
|
||||
id: copilotChats.id,
|
||||
title: copilotChats.title,
|
||||
model: copilotChats.model,
|
||||
messages: copilotChats.messages,
|
||||
planArtifact: copilotChats.planArtifact,
|
||||
config: copilotChats.config,
|
||||
createdAt: copilotChats.createdAt,
|
||||
updatedAt: copilotChats.updatedAt,
|
||||
})
|
||||
.from(copilotChats)
|
||||
.where(and(eq(copilotChats.userId, authenticatedUserId), scopeFilter))
|
||||
.orderBy(desc(copilotChats.updatedAt))
|
||||
|
||||
const transformedChats = chats.map((chat) => ({
|
||||
id: chat.id,
|
||||
title: chat.title,
|
||||
model: chat.model,
|
||||
messages: Array.isArray(chat.messages) ? chat.messages : [],
|
||||
messageCount: Array.isArray(chat.messages) ? chat.messages.length : 0,
|
||||
planArtifact: chat.planArtifact || null,
|
||||
config: chat.config || null,
|
||||
createdAt: chat.createdAt,
|
||||
updatedAt: chat.updatedAt,
|
||||
}))
|
||||
|
||||
const scope = workflowId ? `workflow ${workflowId}` : `workspace ${workspaceId}`
|
||||
logger.info(`Retrieved ${transformedChats.length} chats for ${scope}`)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
chats: transformedChats,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error fetching copilot chats', error)
|
||||
return createInternalServerErrorResponse('Failed to fetch chats')
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// GET handler (read-only queries, extracted to queries.ts)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export { GET } from './queries'
|
||||
|
||||
@@ -4,25 +4,67 @@
|
||||
|
||||
import { NextRequest } from 'next/server'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import {
|
||||
MothershipStreamV1CompletionStatus,
|
||||
MothershipStreamV1EventType,
|
||||
} from '@/lib/copilot/generated/mothership-stream-v1'
|
||||
|
||||
const { getStreamMeta, readStreamEvents, authenticateCopilotRequestSessionOnly } = vi.hoisted(
|
||||
() => ({
|
||||
getStreamMeta: vi.fn(),
|
||||
readStreamEvents: vi.fn(),
|
||||
authenticateCopilotRequestSessionOnly: vi.fn(),
|
||||
})
|
||||
)
|
||||
|
||||
vi.mock('@/lib/copilot/orchestrator/stream/buffer', () => ({
|
||||
getStreamMeta,
|
||||
readStreamEvents,
|
||||
const {
|
||||
getLatestRunForStream,
|
||||
readEvents,
|
||||
checkForReplayGap,
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
} = vi.hoisted(() => ({
|
||||
getLatestRunForStream: vi.fn(),
|
||||
readEvents: vi.fn(),
|
||||
checkForReplayGap: vi.fn(),
|
||||
authenticateCopilotRequestSessionOnly: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request-helpers', () => ({
|
||||
vi.mock('@/lib/copilot/async-runs/repository', () => ({
|
||||
getLatestRunForStream,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request/session', () => ({
|
||||
readEvents,
|
||||
checkForReplayGap,
|
||||
createEvent: (event: Record<string, unknown>) => ({
|
||||
stream: {
|
||||
streamId: event.streamId,
|
||||
cursor: event.cursor,
|
||||
},
|
||||
seq: event.seq,
|
||||
trace: { requestId: event.requestId ?? '' },
|
||||
type: event.type,
|
||||
payload: event.payload,
|
||||
}),
|
||||
encodeSSEEnvelope: (event: Record<string, unknown>) =>
|
||||
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
|
||||
SSE_RESPONSE_HEADERS: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request/http', () => ({
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
}))
|
||||
|
||||
import { GET } from '@/app/api/copilot/chat/stream/route'
|
||||
import { GET } from './route'
|
||||
|
||||
async function readAllChunks(response: Response): Promise<string[]> {
|
||||
const reader = response.body?.getReader()
|
||||
expect(reader).toBeTruthy()
|
||||
|
||||
const chunks: string[] = []
|
||||
while (true) {
|
||||
const { done, value } = await reader!.read()
|
||||
if (done) {
|
||||
break
|
||||
}
|
||||
chunks.push(new TextDecoder().decode(value))
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
describe('copilot chat stream replay route', () => {
|
||||
beforeEach(() => {
|
||||
@@ -31,29 +73,54 @@ describe('copilot chat stream replay route', () => {
|
||||
userId: 'user-1',
|
||||
isAuthenticated: true,
|
||||
})
|
||||
readStreamEvents.mockResolvedValue([])
|
||||
readEvents.mockResolvedValue([])
|
||||
checkForReplayGap.mockResolvedValue(null)
|
||||
})
|
||||
|
||||
it('stops replay polling when stream meta becomes cancelled', async () => {
|
||||
getStreamMeta
|
||||
it('stops replay polling when run becomes cancelled', async () => {
|
||||
getLatestRunForStream
|
||||
.mockResolvedValueOnce({
|
||||
status: 'active',
|
||||
userId: 'user-1',
|
||||
executionId: 'exec-1',
|
||||
id: 'run-1',
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
status: 'cancelled',
|
||||
userId: 'user-1',
|
||||
executionId: 'exec-1',
|
||||
id: 'run-1',
|
||||
})
|
||||
|
||||
const response = await GET(
|
||||
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1')
|
||||
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0')
|
||||
)
|
||||
|
||||
const reader = response.body?.getReader()
|
||||
expect(reader).toBeTruthy()
|
||||
const chunks = await readAllChunks(response)
|
||||
expect(chunks.join('')).toContain(
|
||||
JSON.stringify({
|
||||
status: MothershipStreamV1CompletionStatus.cancelled,
|
||||
reason: 'terminal_status',
|
||||
})
|
||||
)
|
||||
expect(getLatestRunForStream).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
const first = await reader!.read()
|
||||
expect(first.done).toBe(true)
|
||||
expect(getStreamMeta).toHaveBeenCalledTimes(2)
|
||||
it('emits structured terminal replay error when run metadata disappears', async () => {
|
||||
getLatestRunForStream
|
||||
.mockResolvedValueOnce({
|
||||
status: 'active',
|
||||
executionId: 'exec-1',
|
||||
id: 'run-1',
|
||||
})
|
||||
.mockResolvedValueOnce(null)
|
||||
|
||||
const response = await GET(
|
||||
new NextRequest('http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0')
|
||||
)
|
||||
|
||||
const chunks = await readAllChunks(response)
|
||||
const body = chunks.join('')
|
||||
expect(body).toContain(`"type":"${MothershipStreamV1EventType.error}"`)
|
||||
expect(body).toContain('"code":"resume_run_unavailable"')
|
||||
expect(body).toContain(`"type":"${MothershipStreamV1EventType.complete}"`)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
|
||||
import {
|
||||
getStreamMeta,
|
||||
readStreamEvents,
|
||||
type StreamMeta,
|
||||
} from '@/lib/copilot/orchestrator/stream/buffer'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
MothershipStreamV1CompletionStatus,
|
||||
MothershipStreamV1EventType,
|
||||
} from '@/lib/copilot/generated/mothership-stream-v1'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
|
||||
import {
|
||||
checkForReplayGap,
|
||||
createEvent,
|
||||
encodeSSEEnvelope,
|
||||
readEvents,
|
||||
SSE_RESPONSE_HEADERS,
|
||||
} from '@/lib/copilot/request/session'
|
||||
|
||||
export const maxDuration = 3600
|
||||
|
||||
@@ -14,8 +20,59 @@ const logger = createLogger('CopilotChatStreamAPI')
|
||||
const POLL_INTERVAL_MS = 250
|
||||
const MAX_STREAM_MS = 60 * 60 * 1000
|
||||
|
||||
function encodeEvent(event: Record<string, any>): Uint8Array {
|
||||
return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`)
|
||||
function isTerminalStatus(
|
||||
status: string | null | undefined
|
||||
): status is MothershipStreamV1CompletionStatus {
|
||||
return (
|
||||
status === MothershipStreamV1CompletionStatus.complete ||
|
||||
status === MothershipStreamV1CompletionStatus.error ||
|
||||
status === MothershipStreamV1CompletionStatus.cancelled
|
||||
)
|
||||
}
|
||||
|
||||
function buildResumeTerminalEnvelopes(options: {
|
||||
streamId: string
|
||||
afterCursor: string
|
||||
status: MothershipStreamV1CompletionStatus
|
||||
message?: string
|
||||
code: string
|
||||
reason?: string
|
||||
}) {
|
||||
const baseSeq = Number(options.afterCursor || '0')
|
||||
const seq = Number.isFinite(baseSeq) ? baseSeq : 0
|
||||
const envelopes: ReturnType<typeof createEvent>[] = []
|
||||
|
||||
if (options.status === MothershipStreamV1CompletionStatus.error) {
|
||||
envelopes.push(
|
||||
createEvent({
|
||||
streamId: options.streamId,
|
||||
cursor: String(seq + 1),
|
||||
seq: seq + 1,
|
||||
requestId: '',
|
||||
type: MothershipStreamV1EventType.error,
|
||||
payload: {
|
||||
message: options.message || 'Stream recovery failed before completion.',
|
||||
code: options.code,
|
||||
},
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
envelopes.push(
|
||||
createEvent({
|
||||
streamId: options.streamId,
|
||||
cursor: String(seq + envelopes.length + 1),
|
||||
seq: seq + envelopes.length + 1,
|
||||
requestId: '',
|
||||
type: MothershipStreamV1EventType.complete,
|
||||
payload: {
|
||||
status: options.status,
|
||||
...(options.reason ? { reason: options.reason } : {}),
|
||||
},
|
||||
})
|
||||
)
|
||||
|
||||
return envelopes
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
@@ -28,58 +85,49 @@ export async function GET(request: NextRequest) {
|
||||
|
||||
const url = new URL(request.url)
|
||||
const streamId = url.searchParams.get('streamId') || ''
|
||||
const fromParam = url.searchParams.get('from') || '0'
|
||||
const fromEventId = Number(fromParam || 0)
|
||||
// If batch=true, return buffered events as JSON instead of SSE
|
||||
const afterCursor = url.searchParams.get('after') || ''
|
||||
const batchMode = url.searchParams.get('batch') === 'true'
|
||||
const toParam = url.searchParams.get('to')
|
||||
const toEventId = toParam ? Number(toParam) : undefined
|
||||
|
||||
const reqLogger = logger.withMetadata({ messageId: streamId || undefined })
|
||||
|
||||
reqLogger.info('[Resume] Received resume request', {
|
||||
streamId: streamId || undefined,
|
||||
fromEventId,
|
||||
toEventId,
|
||||
batchMode,
|
||||
})
|
||||
|
||||
if (!streamId) {
|
||||
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
const meta = (await getStreamMeta(streamId)) as StreamMeta | null
|
||||
reqLogger.info('[Resume] Stream lookup', {
|
||||
streamId,
|
||||
fromEventId,
|
||||
toEventId,
|
||||
batchMode,
|
||||
hasMeta: !!meta,
|
||||
metaStatus: meta?.status,
|
||||
const run = await getLatestRunForStream(streamId, authenticatedUserId).catch((err) => {
|
||||
logger.warn('Failed to fetch latest run for stream', {
|
||||
streamId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
})
|
||||
if (!meta) {
|
||||
logger.info('[Resume] Stream lookup', {
|
||||
streamId,
|
||||
afterCursor,
|
||||
batchMode,
|
||||
hasRun: !!run,
|
||||
runStatus: run?.status,
|
||||
})
|
||||
if (!run) {
|
||||
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
||||
}
|
||||
if (meta.userId && meta.userId !== authenticatedUserId) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Batch mode: return all buffered events as JSON
|
||||
if (batchMode) {
|
||||
const events = await readStreamEvents(streamId, fromEventId)
|
||||
const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events
|
||||
reqLogger.info('[Resume] Batch response', {
|
||||
const afterSeq = afterCursor || '0'
|
||||
const events = await readEvents(streamId, afterSeq)
|
||||
const batchEvents = events.map((envelope) => ({
|
||||
eventId: envelope.seq,
|
||||
streamId: envelope.stream.streamId,
|
||||
event: envelope,
|
||||
}))
|
||||
logger.info('[Resume] Batch response', {
|
||||
streamId,
|
||||
fromEventId,
|
||||
toEventId,
|
||||
eventCount: filteredEvents.length,
|
||||
afterCursor: afterSeq,
|
||||
eventCount: batchEvents.length,
|
||||
runStatus: run.status,
|
||||
})
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
events: filteredEvents,
|
||||
status: meta.status,
|
||||
executionId: meta.executionId,
|
||||
runId: meta.runId,
|
||||
events: batchEvents,
|
||||
status: run.status,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -87,9 +135,9 @@ export async function GET(request: NextRequest) {
|
||||
|
||||
const stream = new ReadableStream({
|
||||
async start(controller) {
|
||||
let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0
|
||||
let latestMeta = meta
|
||||
let cursor = afterCursor || '0'
|
||||
let controllerClosed = false
|
||||
let sawTerminalEvent = false
|
||||
|
||||
const closeController = () => {
|
||||
if (controllerClosed) return
|
||||
@@ -97,14 +145,14 @@ export async function GET(request: NextRequest) {
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// Controller already closed by runtime/client - treat as normal.
|
||||
// Controller already closed by runtime/client
|
||||
}
|
||||
}
|
||||
|
||||
const enqueueEvent = (payload: Record<string, any>) => {
|
||||
const enqueueEvent = (payload: unknown) => {
|
||||
if (controllerClosed) return false
|
||||
try {
|
||||
controller.enqueue(encodeEvent(payload))
|
||||
controller.enqueue(encodeSSEEnvelope(payload))
|
||||
return true
|
||||
} catch {
|
||||
controllerClosed = true
|
||||
@@ -118,47 +166,96 @@ export async function GET(request: NextRequest) {
|
||||
request.signal.addEventListener('abort', abortListener, { once: true })
|
||||
|
||||
const flushEvents = async () => {
|
||||
const events = await readStreamEvents(streamId, lastEventId)
|
||||
const events = await readEvents(streamId, cursor)
|
||||
if (events.length > 0) {
|
||||
reqLogger.info('[Resume] Flushing events', {
|
||||
logger.info('[Resume] Flushing events', {
|
||||
streamId,
|
||||
fromEventId: lastEventId,
|
||||
afterCursor: cursor,
|
||||
eventCount: events.length,
|
||||
})
|
||||
}
|
||||
for (const entry of events) {
|
||||
lastEventId = entry.eventId
|
||||
const payload = {
|
||||
...entry.event,
|
||||
eventId: entry.eventId,
|
||||
streamId: entry.streamId,
|
||||
executionId: latestMeta?.executionId,
|
||||
runId: latestMeta?.runId,
|
||||
for (const envelope of events) {
|
||||
cursor = envelope.stream.cursor ?? String(envelope.seq)
|
||||
if (envelope.type === MothershipStreamV1EventType.complete) {
|
||||
sawTerminalEvent = true
|
||||
}
|
||||
if (!enqueueEvent(payload)) {
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const emitTerminalIfMissing = (
|
||||
status: MothershipStreamV1CompletionStatus,
|
||||
options?: { message?: string; code: string; reason?: string }
|
||||
) => {
|
||||
if (controllerClosed || sawTerminalEvent) {
|
||||
return
|
||||
}
|
||||
for (const envelope of buildResumeTerminalEnvelopes({
|
||||
streamId,
|
||||
afterCursor: cursor,
|
||||
status,
|
||||
message: options?.message,
|
||||
code: options?.code ?? 'resume_terminal',
|
||||
reason: options?.reason,
|
||||
})) {
|
||||
cursor = envelope.stream.cursor ?? String(envelope.seq)
|
||||
if (envelope.type === MothershipStreamV1EventType.complete) {
|
||||
sawTerminalEvent = true
|
||||
}
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const gap = await checkForReplayGap(streamId, afterCursor)
|
||||
if (gap) {
|
||||
for (const envelope of gap.envelopes) {
|
||||
enqueueEvent(envelope)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
await flushEvents()
|
||||
|
||||
while (!controllerClosed && Date.now() - startTime < MAX_STREAM_MS) {
|
||||
const currentMeta = await getStreamMeta(streamId)
|
||||
if (!currentMeta) break
|
||||
latestMeta = currentMeta
|
||||
const currentRun = await getLatestRunForStream(streamId, authenticatedUserId).catch(
|
||||
(err) => {
|
||||
logger.warn('Failed to poll latest run for stream', {
|
||||
streamId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
}
|
||||
)
|
||||
if (!currentRun) {
|
||||
emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, {
|
||||
message: 'The stream could not be recovered because its run metadata is unavailable.',
|
||||
code: 'resume_run_unavailable',
|
||||
reason: 'run_unavailable',
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
await flushEvents()
|
||||
|
||||
if (controllerClosed) {
|
||||
break
|
||||
}
|
||||
if (
|
||||
currentMeta.status === 'complete' ||
|
||||
currentMeta.status === 'error' ||
|
||||
currentMeta.status === 'cancelled'
|
||||
) {
|
||||
if (isTerminalStatus(currentRun.status)) {
|
||||
emitTerminalIfMissing(currentRun.status, {
|
||||
message:
|
||||
currentRun.status === MothershipStreamV1CompletionStatus.error
|
||||
? typeof currentRun.error === 'string'
|
||||
? currentRun.error
|
||||
: 'The recovered stream ended with an error.'
|
||||
: undefined,
|
||||
code: 'resume_terminal_status',
|
||||
reason: 'terminal_status',
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
@@ -169,12 +266,24 @@ export async function GET(request: NextRequest) {
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
|
||||
}
|
||||
if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) {
|
||||
emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, {
|
||||
message: 'The stream recovery timed out before completion.',
|
||||
code: 'resume_timeout',
|
||||
reason: 'timeout',
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
if (!controllerClosed && !request.signal.aborted) {
|
||||
reqLogger.warn('Stream replay failed', {
|
||||
logger.warn('Stream replay failed', {
|
||||
streamId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
emitTerminalIfMissing(MothershipStreamV1CompletionStatus.error, {
|
||||
message: 'The stream replay failed before completion.',
|
||||
code: 'resume_internal',
|
||||
reason: 'stream_replay_failed',
|
||||
})
|
||||
}
|
||||
} finally {
|
||||
request.signal.removeEventListener('abort', abortListener)
|
||||
@@ -183,5 +292,5 @@ export async function GET(request: NextRequest) {
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, { headers: SSE_HEADERS })
|
||||
return new Response(stream, { headers: SSE_RESPONSE_HEADERS })
|
||||
}
|
||||
|
||||
@@ -327,7 +327,35 @@ describe('Copilot Chat Update Messages API Route', () => {
|
||||
})
|
||||
|
||||
expect(mockSet).toHaveBeenCalledWith({
|
||||
messages,
|
||||
messages: [
|
||||
{
|
||||
id: 'msg-1',
|
||||
role: 'user',
|
||||
content: 'Hello',
|
||||
timestamp: '2024-01-01T10:00:00.000Z',
|
||||
},
|
||||
{
|
||||
id: 'msg-2',
|
||||
role: 'assistant',
|
||||
content: 'Hi there!',
|
||||
timestamp: '2024-01-01T10:01:00.000Z',
|
||||
contentBlocks: [
|
||||
{
|
||||
type: 'text',
|
||||
content: 'Here is the weather information',
|
||||
},
|
||||
{
|
||||
type: 'tool',
|
||||
phase: 'call',
|
||||
toolCall: {
|
||||
id: 'tool-1',
|
||||
name: 'get_weather',
|
||||
state: 'pending',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
updatedAt: expect.any(Date),
|
||||
})
|
||||
})
|
||||
|
||||
@@ -4,15 +4,16 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { COPILOT_MODES } from '@/lib/copilot/models'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
|
||||
import { COPILOT_MODES } from '@/lib/copilot/constants'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createInternalServerErrorResponse,
|
||||
createNotFoundResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
|
||||
const logger = createLogger('CopilotChatUpdateAPI')
|
||||
|
||||
@@ -78,12 +79,15 @@ export async function POST(req: NextRequest) {
|
||||
}
|
||||
|
||||
const { chatId, messages, planArtifact, config } = UpdateMessagesSchema.parse(body)
|
||||
const normalizedMessages: PersistedMessage[] = messages.map((message) =>
|
||||
normalizeMessage(message as Record<string, unknown>)
|
||||
)
|
||||
|
||||
// Debug: Log what we're about to save
|
||||
const lastMsgParsed = messages[messages.length - 1]
|
||||
const lastMsgParsed = normalizedMessages[normalizedMessages.length - 1]
|
||||
if (lastMsgParsed?.role === 'assistant') {
|
||||
logger.info(`[${tracker.requestId}] Parsed messages to save`, {
|
||||
messageCount: messages.length,
|
||||
messageCount: normalizedMessages.length,
|
||||
lastMsgId: lastMsgParsed.id,
|
||||
lastMsgContentLength: lastMsgParsed.content?.length || 0,
|
||||
lastMsgContentBlockCount: lastMsgParsed.contentBlocks?.length || 0,
|
||||
@@ -99,8 +103,8 @@ export async function POST(req: NextRequest) {
|
||||
}
|
||||
|
||||
// Update chat with new messages, plan artifact, and config
|
||||
const updateData: Record<string, any> = {
|
||||
messages: messages,
|
||||
const updateData: Record<string, unknown> = {
|
||||
messages: normalizedMessages,
|
||||
updatedAt: new Date(),
|
||||
}
|
||||
|
||||
@@ -116,14 +120,14 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
logger.info(`[${tracker.requestId}] Successfully updated chat`, {
|
||||
chatId,
|
||||
newMessageCount: messages.length,
|
||||
newMessageCount: normalizedMessages.length,
|
||||
hasPlanArtifact: !!planArtifact,
|
||||
hasConfig: !!config,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
messageCount: messages.length,
|
||||
messageCount: normalizedMessages.length,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${tracker.requestId}] Error updating chat messages:`, error)
|
||||
|
||||
@@ -66,7 +66,7 @@ vi.mock('drizzle-orm', () => ({
|
||||
sql: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request-helpers', () => ({
|
||||
vi.mock('@/lib/copilot/request/http', () => ({
|
||||
authenticateCopilotRequestSessionOnly: mockAuthenticate,
|
||||
createUnauthorizedResponse: mockCreateUnauthorizedResponse,
|
||||
createInternalServerErrorResponse: mockCreateInternalServerErrorResponse,
|
||||
|
||||
@@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, desc, eq, isNull, or, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
@@ -37,7 +37,7 @@ export async function GET(_request: NextRequest) {
|
||||
title: copilotChats.title,
|
||||
workflowId: copilotChats.workflowId,
|
||||
workspaceId: copilotChats.workspaceId,
|
||||
conversationId: copilotChats.conversationId,
|
||||
activeStreamId: copilotChats.conversationId,
|
||||
updatedAt: copilotChats.updatedAt,
|
||||
})
|
||||
.from(copilotChats)
|
||||
|
||||
@@ -43,7 +43,7 @@ vi.mock('@/lib/workflows/utils', () => ({
|
||||
authorizeWorkflowByWorkspacePermission: mockAuthorize,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/chat-lifecycle', () => ({
|
||||
vi.mock('@/lib/copilot/chat/lifecycle', () => ({
|
||||
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
|
||||
}))
|
||||
|
||||
@@ -304,6 +304,7 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
loops: {},
|
||||
parallels: {},
|
||||
isDeployed: true,
|
||||
deploymentStatuses: { production: 'deployed' },
|
||||
},
|
||||
}
|
||||
|
||||
@@ -348,6 +349,7 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
loops: {},
|
||||
parallels: {},
|
||||
isDeployed: true,
|
||||
deploymentStatuses: { production: 'deployed' },
|
||||
lastSaved: 1640995200000,
|
||||
},
|
||||
},
|
||||
@@ -368,6 +370,7 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
loops: {},
|
||||
parallels: {},
|
||||
isDeployed: true,
|
||||
deploymentStatuses: { production: 'deployed' },
|
||||
lastSaved: 1640995200000,
|
||||
}),
|
||||
}
|
||||
@@ -470,6 +473,7 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
edges: undefined,
|
||||
loops: null,
|
||||
parallels: undefined,
|
||||
deploymentStatuses: null,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -504,6 +508,7 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
loops: {},
|
||||
parallels: {},
|
||||
isDeployed: false,
|
||||
deploymentStatuses: {},
|
||||
lastSaved: 1640995200000,
|
||||
})
|
||||
})
|
||||
@@ -763,6 +768,10 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
parallel1: { branches: ['branch1', 'branch2'] },
|
||||
},
|
||||
isDeployed: true,
|
||||
deploymentStatuses: {
|
||||
production: 'deployed',
|
||||
staging: 'pending',
|
||||
},
|
||||
deployedAt: '2024-01-01T10:00:00.000Z',
|
||||
},
|
||||
}
|
||||
@@ -807,6 +816,10 @@ describe('Copilot Checkpoints Revert API Route', () => {
|
||||
parallel1: { branches: ['branch1', 'branch2'] },
|
||||
},
|
||||
isDeployed: true,
|
||||
deploymentStatuses: {
|
||||
production: 'deployed',
|
||||
staging: 'pending',
|
||||
},
|
||||
deployedAt: '2024-01-01T10:00:00.000Z',
|
||||
lastSaved: 1640995200000,
|
||||
})
|
||||
|
||||
@@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createInternalServerErrorResponse,
|
||||
createNotFoundResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { isUuidV4 } from '@/executor/constants'
|
||||
@@ -82,6 +82,7 @@ export async function POST(request: NextRequest) {
|
||||
loops: checkpointState?.loops || {},
|
||||
parallels: checkpointState?.parallels || {},
|
||||
isDeployed: checkpointState?.isDeployed || false,
|
||||
deploymentStatuses: checkpointState?.deploymentStatuses || {},
|
||||
lastSaved: Date.now(),
|
||||
...(checkpointState?.deployedAt &&
|
||||
checkpointState.deployedAt !== null &&
|
||||
|
||||
@@ -62,7 +62,7 @@ vi.mock('drizzle-orm', () => ({
|
||||
desc: vi.fn((field: unknown) => ({ field, type: 'desc' })),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/chat-lifecycle', () => ({
|
||||
vi.mock('@/lib/copilot/chat/lifecycle', () => ({
|
||||
getAccessibleCopilotChat: mockGetAccessibleCopilotChat,
|
||||
}))
|
||||
|
||||
|
||||
@@ -4,14 +4,14 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, desc, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
|
||||
const logger = createLogger('WorkflowCheckpointsAPI')
|
||||
|
||||
@@ -38,7 +38,7 @@ const {
|
||||
publishToolConfirmation: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request-helpers', () => ({
|
||||
vi.mock('@/lib/copilot/request/http', () => ({
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
@@ -54,7 +54,7 @@ vi.mock('@/lib/copilot/async-runs/repository', () => ({
|
||||
completeAsyncToolCall,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/orchestrator/persistence', () => ({
|
||||
vi.mock('@/lib/copilot/persistence/tool-confirm', () => ({
|
||||
publishToolConfirmation,
|
||||
}))
|
||||
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { ASYNC_TOOL_STATUS } from '@/lib/copilot/async-runs/lifecycle'
|
||||
import {
|
||||
completeAsyncToolCall,
|
||||
getAsyncToolCall,
|
||||
getRunSegment,
|
||||
upsertAsyncToolCall,
|
||||
} from '@/lib/copilot/async-runs/repository'
|
||||
import { publishToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
|
||||
import { publishToolConfirmation } from '@/lib/copilot/persistence/tool-confirm'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
@@ -16,7 +17,7 @@ import {
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
type NotificationStatus,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
|
||||
const logger = createLogger('CopilotConfirmAPI')
|
||||
|
||||
@@ -42,17 +43,17 @@ async function updateToolCallStatus(
|
||||
const toolCallId = existing.toolCallId
|
||||
const durableStatus =
|
||||
status === 'success'
|
||||
? 'completed'
|
||||
? ASYNC_TOOL_STATUS.completed
|
||||
: status === 'cancelled'
|
||||
? 'cancelled'
|
||||
? ASYNC_TOOL_STATUS.cancelled
|
||||
: status === 'error' || status === 'rejected'
|
||||
? 'failed'
|
||||
: 'pending'
|
||||
? ASYNC_TOOL_STATUS.failed
|
||||
: ASYNC_TOOL_STATUS.pending
|
||||
try {
|
||||
if (
|
||||
durableStatus === 'completed' ||
|
||||
durableStatus === 'failed' ||
|
||||
durableStatus === 'cancelled'
|
||||
durableStatus === ASYNC_TOOL_STATUS.completed ||
|
||||
durableStatus === ASYNC_TOOL_STATUS.failed ||
|
||||
durableStatus === ASYNC_TOOL_STATUS.cancelled
|
||||
) {
|
||||
await completeAsyncToolCall({
|
||||
toolCallId,
|
||||
@@ -107,13 +108,25 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
const body = await req.json()
|
||||
const { toolCallId, status, message, data } = ConfirmationSchema.parse(body)
|
||||
const existing = await getAsyncToolCall(toolCallId).catch(() => null)
|
||||
const existing = await getAsyncToolCall(toolCallId).catch((err) => {
|
||||
logger.warn('Failed to fetch async tool call', {
|
||||
toolCallId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
})
|
||||
|
||||
if (!existing) {
|
||||
return createNotFoundResponse('Tool call not found')
|
||||
}
|
||||
|
||||
const run = await getRunSegment(existing.runId).catch(() => null)
|
||||
const run = await getRunSegment(existing.runId).catch((err) => {
|
||||
logger.warn('Failed to fetch run segment', {
|
||||
runId: existing.runId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
return null
|
||||
})
|
||||
if (!run) {
|
||||
return createNotFoundResponse('Tool call run not found')
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
|
||||
import { routeExecution } from '@/lib/copilot/tools/server/router'
|
||||
|
||||
/**
|
||||
|
||||
@@ -57,7 +57,7 @@ vi.mock('drizzle-orm', () => ({
|
||||
eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request-helpers', () => ({
|
||||
vi.mock('@/lib/copilot/request/http', () => ({
|
||||
authenticateCopilotRequestSessionOnly: mockAuthenticate,
|
||||
createUnauthorizedResponse: mockCreateUnauthorizedResponse,
|
||||
createBadRequestResponse: mockCreateBadRequestResponse,
|
||||
|
||||
@@ -10,8 +10,7 @@ import {
|
||||
createInternalServerErrorResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
} from '@/lib/copilot/request/http'
|
||||
|
||||
const logger = createLogger('CopilotFeedbackAPI')
|
||||
|
||||
@@ -77,12 +76,6 @@ export async function POST(req: NextRequest) {
|
||||
duration: tracker.getDuration(),
|
||||
})
|
||||
|
||||
captureServerEvent(authenticatedUserId, 'copilot_feedback_submitted', {
|
||||
is_positive: isPositiveFeedback,
|
||||
has_text_feedback: !!feedback,
|
||||
has_workflow_yaml: !!workflowYaml,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
feedbackId: feedbackRecord.feedbackId,
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
||||
import type { AvailableModel } from '@/lib/copilot/types'
|
||||
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request/http'
|
||||
|
||||
interface AvailableModel {
|
||||
id: string
|
||||
friendlyName: string
|
||||
provider: string
|
||||
}
|
||||
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('CopilotModelsAPI')
|
||||
|
||||
@@ -23,7 +23,7 @@ const {
|
||||
mockFetch: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request-helpers', () => ({
|
||||
vi.mock('@/lib/copilot/request/http', () => ({
|
||||
authenticateCopilotRequestSessionOnly: mockAuthenticateCopilotRequestSessionOnly,
|
||||
createUnauthorizedResponse: mockCreateUnauthorizedResponse,
|
||||
createBadRequestResponse: mockCreateBadRequestResponse,
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
createInternalServerErrorResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const BodySchema = z.object({
|
||||
|
||||
@@ -4,7 +4,7 @@ import { z } from 'zod'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('CopilotTrainingExamplesAPI')
|
||||
|
||||
@@ -4,7 +4,7 @@ import { z } from 'zod'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('CopilotTrainingAPI')
|
||||
|
||||
@@ -11,7 +11,6 @@ import {
|
||||
syncPersonalEnvCredentialsForUser,
|
||||
syncWorkspaceEnvCredentials,
|
||||
} from '@/lib/credentials/environment'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
|
||||
const logger = createLogger('CredentialByIdAPI')
|
||||
|
||||
@@ -237,17 +236,6 @@ export async function DELETE(
|
||||
envKeys: Object.keys(current),
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'credential_deleted',
|
||||
{
|
||||
credential_type: 'env_personal',
|
||||
provider_id: access.credential.envKey,
|
||||
workspace_id: access.credential.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: access.credential.workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true }, { status: 200 })
|
||||
}
|
||||
|
||||
@@ -290,33 +278,10 @@ export async function DELETE(
|
||||
actingUserId: session.user.id,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'credential_deleted',
|
||||
{
|
||||
credential_type: 'env_workspace',
|
||||
provider_id: access.credential.envKey,
|
||||
workspace_id: access.credential.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: access.credential.workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true }, { status: 200 })
|
||||
}
|
||||
|
||||
await db.delete(credential).where(eq(credential.id, id))
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'credential_deleted',
|
||||
{
|
||||
credential_type: access.credential.type as 'oauth' | 'service_account',
|
||||
provider_id: access.credential.providerId ?? id,
|
||||
workspace_id: access.credential.workspaceId,
|
||||
},
|
||||
{ groups: { workspace: access.credential.workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true }, { status: 200 })
|
||||
} catch (error) {
|
||||
logger.error('Failed to delete credential', error)
|
||||
|
||||
@@ -10,7 +10,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getWorkspaceMemberUserIds } from '@/lib/credentials/environment'
|
||||
import { syncWorkspaceOAuthCredentialsForUser } from '@/lib/credentials/oauth'
|
||||
import { getServiceConfigByProviderId } from '@/lib/oauth'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
import { isValidEnvVarName } from '@/executor/constants'
|
||||
|
||||
@@ -601,16 +600,6 @@ export async function POST(request: NextRequest) {
|
||||
.where(eq(credential.id, credentialId))
|
||||
.limit(1)
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'credential_connected',
|
||||
{ credential_type: type, provider_id: resolvedProviderId ?? type, workspace_id: workspaceId },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_credential_connected_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
return NextResponse.json({ credential: created }, { status: 201 })
|
||||
} catch (error: any) {
|
||||
if (error?.code === '23505') {
|
||||
|
||||
@@ -75,6 +75,16 @@ vi.mock('@/lib/uploads/utils/file-utils', () => ({
|
||||
|
||||
vi.mock('@/lib/uploads/setup.server', () => ({}))
|
||||
|
||||
vi.mock('@/lib/execution/doc-vm', () => ({
|
||||
generatePdfFromCode: vi.fn().mockResolvedValue(Buffer.from('%PDF-compiled')),
|
||||
generateDocxFromCode: vi.fn().mockResolvedValue(Buffer.from('PK\x03\x04compiled')),
|
||||
generatePptxFromCode: vi.fn().mockResolvedValue(Buffer.from('PK\x03\x04compiled')),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/uploads/contexts/workspace/workspace-file-manager', () => ({
|
||||
parseWorkspaceFileKey: vi.fn().mockReturnValue(undefined),
|
||||
}))
|
||||
|
||||
vi.mock('@/app/api/files/utils', () => ({
|
||||
FileNotFoundError,
|
||||
createFileResponse: mockCreateFileResponse,
|
||||
|
||||
@@ -4,7 +4,11 @@ import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { NextResponse } from 'next/server'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generatePptxFromCode } from '@/lib/execution/pptx-vm'
|
||||
import {
|
||||
generateDocxFromCode,
|
||||
generatePdfFromCode,
|
||||
generatePptxFromCode,
|
||||
} from '@/lib/execution/doc-vm'
|
||||
import { CopilotFiles, isUsingCloudStorage } from '@/lib/uploads'
|
||||
import type { StorageContext } from '@/lib/uploads/config'
|
||||
import { parseWorkspaceFileKey } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
|
||||
@@ -22,47 +26,73 @@ import {
|
||||
const logger = createLogger('FilesServeAPI')
|
||||
|
||||
const ZIP_MAGIC = Buffer.from([0x50, 0x4b, 0x03, 0x04])
|
||||
const PDF_MAGIC = Buffer.from([0x25, 0x50, 0x44, 0x46, 0x2d]) // %PDF-
|
||||
|
||||
const MAX_COMPILED_PPTX_CACHE = 10
|
||||
const compiledPptxCache = new Map<string, Buffer>()
|
||||
|
||||
function compiledCacheSet(key: string, buffer: Buffer): void {
|
||||
if (compiledPptxCache.size >= MAX_COMPILED_PPTX_CACHE) {
|
||||
compiledPptxCache.delete(compiledPptxCache.keys().next().value as string)
|
||||
}
|
||||
compiledPptxCache.set(key, buffer)
|
||||
interface CompilableFormat {
|
||||
magic: Buffer
|
||||
compile: (code: string, workspaceId: string) => Promise<Buffer>
|
||||
contentType: string
|
||||
}
|
||||
|
||||
async function compilePptxIfNeeded(
|
||||
const COMPILABLE_FORMATS: Record<string, CompilableFormat> = {
|
||||
'.pptx': {
|
||||
magic: ZIP_MAGIC,
|
||||
compile: generatePptxFromCode,
|
||||
contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
||||
},
|
||||
'.docx': {
|
||||
magic: ZIP_MAGIC,
|
||||
compile: generateDocxFromCode,
|
||||
contentType: 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
},
|
||||
'.pdf': {
|
||||
magic: PDF_MAGIC,
|
||||
compile: generatePdfFromCode,
|
||||
contentType: 'application/pdf',
|
||||
},
|
||||
}
|
||||
|
||||
const MAX_COMPILED_DOC_CACHE = 10
|
||||
const compiledDocCache = new Map<string, Buffer>()
|
||||
|
||||
function compiledCacheSet(key: string, buffer: Buffer): void {
|
||||
if (compiledDocCache.size >= MAX_COMPILED_DOC_CACHE) {
|
||||
compiledDocCache.delete(compiledDocCache.keys().next().value as string)
|
||||
}
|
||||
compiledDocCache.set(key, buffer)
|
||||
}
|
||||
|
||||
async function compileDocumentIfNeeded(
|
||||
buffer: Buffer,
|
||||
filename: string,
|
||||
workspaceId?: string,
|
||||
raw?: boolean
|
||||
): Promise<{ buffer: Buffer; contentType: string }> {
|
||||
const isPptx = filename.toLowerCase().endsWith('.pptx')
|
||||
if (raw || !isPptx || buffer.subarray(0, 4).equals(ZIP_MAGIC)) {
|
||||
if (raw) return { buffer, contentType: getContentType(filename) }
|
||||
|
||||
const ext = filename.slice(filename.lastIndexOf('.')).toLowerCase()
|
||||
const format = COMPILABLE_FORMATS[ext]
|
||||
if (!format) return { buffer, contentType: getContentType(filename) }
|
||||
|
||||
const magicLen = format.magic.length
|
||||
if (buffer.length >= magicLen && buffer.subarray(0, magicLen).equals(format.magic)) {
|
||||
return { buffer, contentType: getContentType(filename) }
|
||||
}
|
||||
|
||||
const code = buffer.toString('utf-8')
|
||||
const cacheKey = createHash('sha256')
|
||||
.update(ext)
|
||||
.update(code)
|
||||
.update(workspaceId ?? '')
|
||||
.digest('hex')
|
||||
const cached = compiledPptxCache.get(cacheKey)
|
||||
const cached = compiledDocCache.get(cacheKey)
|
||||
if (cached) {
|
||||
return {
|
||||
buffer: cached,
|
||||
contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
||||
}
|
||||
return { buffer: cached, contentType: format.contentType }
|
||||
}
|
||||
|
||||
const compiled = await generatePptxFromCode(code, workspaceId || '')
|
||||
const compiled = await format.compile(code, workspaceId || '')
|
||||
compiledCacheSet(cacheKey, compiled)
|
||||
return {
|
||||
buffer: compiled,
|
||||
contentType: 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
||||
}
|
||||
return { buffer: compiled, contentType: format.contentType }
|
||||
}
|
||||
|
||||
const STORAGE_KEY_PREFIX_RE = /^\d{13}-[a-z0-9]{7}-/
|
||||
@@ -169,7 +199,7 @@ async function handleLocalFile(
|
||||
const segment = filename.split('/').pop() || filename
|
||||
const displayName = stripStorageKeyPrefix(segment)
|
||||
const workspaceId = getWorkspaceIdForCompile(filename)
|
||||
const { buffer: fileBuffer, contentType } = await compilePptxIfNeeded(
|
||||
const { buffer: fileBuffer, contentType } = await compileDocumentIfNeeded(
|
||||
rawBuffer,
|
||||
displayName,
|
||||
workspaceId,
|
||||
@@ -226,7 +256,7 @@ async function handleCloudProxy(
|
||||
const segment = cloudKey.split('/').pop() || 'download'
|
||||
const displayName = stripStorageKeyPrefix(segment)
|
||||
const workspaceId = getWorkspaceIdForCompile(cloudKey)
|
||||
const { buffer: fileBuffer, contentType } = await compilePptxIfNeeded(
|
||||
const { buffer: fileBuffer, contentType } = await compileDocumentIfNeeded(
|
||||
rawBuffer,
|
||||
displayName,
|
||||
workspaceId,
|
||||
|
||||
@@ -24,6 +24,27 @@ vi.mock('@/lib/auth/hybrid', () => ({
|
||||
|
||||
vi.mock('@/lib/execution/e2b', () => ({
|
||||
executeInE2B: mockExecuteInE2B,
|
||||
executeShellInE2B: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/copilot/request/tools/files', () => ({
|
||||
FORMAT_TO_CONTENT_TYPE: {
|
||||
json: 'application/json',
|
||||
csv: 'text/csv',
|
||||
txt: 'text/plain',
|
||||
md: 'text/markdown',
|
||||
html: 'text/html',
|
||||
},
|
||||
normalizeOutputWorkspaceFileName: vi.fn((p: string) => p.replace(/^files\//, '')),
|
||||
resolveOutputFormat: vi.fn(() => 'json'),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/uploads/contexts/workspace/workspace-file-manager', () => ({
|
||||
uploadWorkspaceFile: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/workflows/utils', () => ({
|
||||
getWorkflowById: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/config/feature-flags', () => ({
|
||||
@@ -32,6 +53,7 @@ vi.mock('@/lib/core/config/feature-flags', () => ({
|
||||
isProd: false,
|
||||
isDev: false,
|
||||
isTest: true,
|
||||
isEmailVerificationEnabled: false,
|
||||
}))
|
||||
|
||||
import { validateProxyUrl } from '@/lib/core/security/input-validation'
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import {
|
||||
FORMAT_TO_CONTENT_TYPE,
|
||||
normalizeOutputWorkspaceFileName,
|
||||
resolveOutputFormat,
|
||||
} from '@/lib/copilot/request/tools/files'
|
||||
import { isE2bEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { executeInE2B } from '@/lib/execution/e2b'
|
||||
import { executeInE2B, executeShellInE2B } from '@/lib/execution/e2b'
|
||||
import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
|
||||
import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages'
|
||||
import { uploadWorkspaceFile } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { escapeRegExp, normalizeName, REFERENCE } from '@/executor/constants'
|
||||
import { type OutputSchema, resolveBlockReference } from '@/executor/utils/block-reference'
|
||||
import { formatLiteralForCode } from '@/executor/utils/code-formatting'
|
||||
@@ -580,6 +587,107 @@ function cleanStdout(stdout: string): string {
|
||||
return stdout
|
||||
}
|
||||
|
||||
async function maybeExportSandboxFileToWorkspace(args: {
|
||||
authUserId: string
|
||||
workflowId?: string
|
||||
workspaceId?: string
|
||||
outputPath?: string
|
||||
outputFormat?: string
|
||||
outputMimeType?: string
|
||||
outputSandboxPath?: string
|
||||
exportedFileContent?: string
|
||||
stdout: string
|
||||
executionTime: number
|
||||
}) {
|
||||
const {
|
||||
authUserId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
outputPath,
|
||||
outputFormat,
|
||||
outputMimeType,
|
||||
outputSandboxPath,
|
||||
exportedFileContent,
|
||||
stdout,
|
||||
executionTime,
|
||||
} = args
|
||||
|
||||
if (!outputSandboxPath) return null
|
||||
|
||||
if (!outputPath) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error:
|
||||
'outputSandboxPath requires outputPath. Set outputPath to the destination workspace file, e.g. "files/result.csv".',
|
||||
output: { result: null, stdout: cleanStdout(stdout), executionTime },
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const resolvedWorkspaceId =
|
||||
workspaceId || (workflowId ? (await getWorkflowById(workflowId))?.workspaceId : undefined)
|
||||
|
||||
if (!resolvedWorkspaceId) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: 'Workspace context required to save sandbox file to workspace',
|
||||
output: { result: null, stdout: cleanStdout(stdout), executionTime },
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
if (exportedFileContent === undefined) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: `Sandbox file "${outputSandboxPath}" was not found or could not be read`,
|
||||
output: { result: null, stdout: cleanStdout(stdout), executionTime },
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
|
||||
const fileName = normalizeOutputWorkspaceFileName(outputPath)
|
||||
|
||||
const TEXT_MIMES = new Set(Object.values(FORMAT_TO_CONTENT_TYPE))
|
||||
const resolvedMimeType =
|
||||
outputMimeType ||
|
||||
FORMAT_TO_CONTENT_TYPE[resolveOutputFormat(fileName, outputFormat)] ||
|
||||
'application/octet-stream'
|
||||
const isBinary = !TEXT_MIMES.has(resolvedMimeType)
|
||||
const fileBuffer = isBinary
|
||||
? Buffer.from(exportedFileContent, 'base64')
|
||||
: Buffer.from(exportedFileContent, 'utf-8')
|
||||
|
||||
const uploaded = await uploadWorkspaceFile(
|
||||
resolvedWorkspaceId,
|
||||
authUserId,
|
||||
fileBuffer,
|
||||
fileName,
|
||||
resolvedMimeType
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
output: {
|
||||
result: {
|
||||
message: `Sandbox file exported to files/${fileName}`,
|
||||
fileId: uploaded.id,
|
||||
fileName,
|
||||
downloadUrl: uploaded.url,
|
||||
sandboxPath: outputSandboxPath,
|
||||
},
|
||||
stdout: cleanStdout(stdout),
|
||||
executionTime,
|
||||
},
|
||||
resources: [{ type: 'file', id: uploaded.id, title: fileName }],
|
||||
})
|
||||
}
|
||||
|
||||
export async function POST(req: NextRequest) {
|
||||
const requestId = generateRequestId()
|
||||
const startTime = Date.now()
|
||||
@@ -603,12 +711,17 @@ export async function POST(req: NextRequest) {
|
||||
params = {},
|
||||
timeout = DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
language = DEFAULT_CODE_LANGUAGE,
|
||||
outputPath,
|
||||
outputFormat,
|
||||
outputMimeType,
|
||||
outputSandboxPath,
|
||||
envVars = {},
|
||||
blockData = {},
|
||||
blockNameMapping = {},
|
||||
blockOutputSchemas = {},
|
||||
workflowVariables = {},
|
||||
workflowId,
|
||||
workspaceId,
|
||||
isCustomTool = false,
|
||||
_sandboxFiles,
|
||||
} = body
|
||||
@@ -652,6 +765,83 @@ export async function POST(req: NextRequest) {
|
||||
hasImports = jsImports.trim().length > 0 || hasRequireStatements
|
||||
}
|
||||
|
||||
if (lang === CodeLanguage.Shell) {
|
||||
if (!isE2bEnabled) {
|
||||
throw new Error(
|
||||
'Shell execution requires E2B to be enabled. Please contact your administrator to enable E2B.'
|
||||
)
|
||||
}
|
||||
|
||||
const shellEnvs: Record<string, string> = {}
|
||||
for (const [k, v] of Object.entries(envVars)) {
|
||||
shellEnvs[k] = String(v)
|
||||
}
|
||||
for (const [k, v] of Object.entries(contextVariables)) {
|
||||
shellEnvs[k] = String(v)
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] E2B shell execution`, {
|
||||
enabled: isE2bEnabled,
|
||||
hasApiKey: Boolean(process.env.E2B_API_KEY),
|
||||
envVarCount: Object.keys(shellEnvs).length,
|
||||
})
|
||||
|
||||
const execStart = Date.now()
|
||||
const {
|
||||
result: shellResult,
|
||||
stdout: shellStdout,
|
||||
sandboxId,
|
||||
error: shellError,
|
||||
exportedFileContent,
|
||||
} = await executeShellInE2B({
|
||||
code: resolvedCode,
|
||||
envs: shellEnvs,
|
||||
timeoutMs: timeout,
|
||||
sandboxFiles: _sandboxFiles,
|
||||
outputSandboxPath,
|
||||
})
|
||||
const executionTime = Date.now() - execStart
|
||||
|
||||
logger.info(`[${requestId}] E2B shell sandbox`, {
|
||||
sandboxId,
|
||||
stdoutPreview: shellStdout?.slice(0, 200),
|
||||
error: shellError,
|
||||
executionTime,
|
||||
})
|
||||
|
||||
if (shellError) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: shellError,
|
||||
output: { result: null, stdout: cleanStdout(shellStdout), executionTime },
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
|
||||
if (outputSandboxPath) {
|
||||
const fileExportResponse = await maybeExportSandboxFileToWorkspace({
|
||||
authUserId: auth.userId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
outputPath,
|
||||
outputFormat,
|
||||
outputMimeType,
|
||||
outputSandboxPath,
|
||||
exportedFileContent,
|
||||
stdout: shellStdout,
|
||||
executionTime,
|
||||
})
|
||||
if (fileExportResponse) return fileExportResponse
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
output: { result: shellResult ?? null, stdout: cleanStdout(shellStdout), executionTime },
|
||||
})
|
||||
}
|
||||
|
||||
if (lang === CodeLanguage.Python && !isE2bEnabled) {
|
||||
throw new Error(
|
||||
'Python execution requires E2B to be enabled. Please contact your administrator to enable E2B, or use JavaScript instead.'
|
||||
@@ -719,11 +909,13 @@ export async function POST(req: NextRequest) {
|
||||
stdout: e2bStdout,
|
||||
sandboxId,
|
||||
error: e2bError,
|
||||
exportedFileContent,
|
||||
} = await executeInE2B({
|
||||
code: codeForE2B,
|
||||
language: CodeLanguage.JavaScript,
|
||||
timeoutMs: timeout,
|
||||
sandboxFiles: _sandboxFiles,
|
||||
outputSandboxPath,
|
||||
})
|
||||
const executionTime = Date.now() - execStart
|
||||
stdout += e2bStdout
|
||||
@@ -752,6 +944,22 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
if (outputSandboxPath) {
|
||||
const fileExportResponse = await maybeExportSandboxFileToWorkspace({
|
||||
authUserId: auth.userId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
outputPath,
|
||||
outputFormat,
|
||||
outputMimeType,
|
||||
outputSandboxPath,
|
||||
exportedFileContent,
|
||||
stdout,
|
||||
executionTime,
|
||||
})
|
||||
if (fileExportResponse) return fileExportResponse
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
output: { result: e2bResult ?? null, stdout: cleanStdout(stdout), executionTime },
|
||||
@@ -783,11 +991,13 @@ export async function POST(req: NextRequest) {
|
||||
stdout: e2bStdout,
|
||||
sandboxId,
|
||||
error: e2bError,
|
||||
exportedFileContent,
|
||||
} = await executeInE2B({
|
||||
code: codeForE2B,
|
||||
language: CodeLanguage.Python,
|
||||
timeoutMs: timeout,
|
||||
sandboxFiles: _sandboxFiles,
|
||||
outputSandboxPath,
|
||||
})
|
||||
const executionTime = Date.now() - execStart
|
||||
stdout += e2bStdout
|
||||
@@ -816,6 +1026,22 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
if (outputSandboxPath) {
|
||||
const fileExportResponse = await maybeExportSandboxFileToWorkspace({
|
||||
authUserId: auth.userId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
outputPath,
|
||||
outputFormat,
|
||||
outputMimeType,
|
||||
outputSandboxPath,
|
||||
exportedFileContent,
|
||||
stdout,
|
||||
executionTime,
|
||||
})
|
||||
if (fileExportResponse) return fileExportResponse
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
output: { result: e2bResult ?? null, stdout: cleanStdout(stdout), executionTime },
|
||||
|
||||
@@ -4,19 +4,13 @@
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
const {
|
||||
mockCheckHybridAuth,
|
||||
mockGetDispatchJobRecord,
|
||||
mockGetJobQueue,
|
||||
mockVerifyWorkflowAccess,
|
||||
mockGetWorkflowById,
|
||||
} = vi.hoisted(() => ({
|
||||
mockCheckHybridAuth: vi.fn(),
|
||||
mockGetDispatchJobRecord: vi.fn(),
|
||||
mockGetJobQueue: vi.fn(),
|
||||
mockVerifyWorkflowAccess: vi.fn(),
|
||||
mockGetWorkflowById: vi.fn(),
|
||||
}))
|
||||
const { mockCheckHybridAuth, mockGetJobQueue, mockVerifyWorkflowAccess, mockGetWorkflowById } =
|
||||
vi.hoisted(() => ({
|
||||
mockCheckHybridAuth: vi.fn(),
|
||||
mockGetJobQueue: vi.fn(),
|
||||
mockVerifyWorkflowAccess: vi.fn(),
|
||||
mockGetWorkflowById: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@sim/logger', () => ({
|
||||
createLogger: () => ({
|
||||
@@ -32,19 +26,9 @@ vi.mock('@/lib/auth/hybrid', () => ({
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/async-jobs', () => ({
|
||||
JOB_STATUS: {
|
||||
PENDING: 'pending',
|
||||
PROCESSING: 'processing',
|
||||
COMPLETED: 'completed',
|
||||
FAILED: 'failed',
|
||||
},
|
||||
getJobQueue: mockGetJobQueue,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
|
||||
getDispatchJobRecord: mockGetDispatchJobRecord,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/utils/request', () => ({
|
||||
generateRequestId: vi.fn().mockReturnValue('request-1'),
|
||||
}))
|
||||
@@ -89,72 +73,78 @@ describe('GET /api/jobs/[jobId]', () => {
|
||||
})
|
||||
})
|
||||
|
||||
it('returns dispatcher-aware waiting status with metadata', async () => {
|
||||
mockGetDispatchJobRecord.mockResolvedValue({
|
||||
id: 'dispatch-1',
|
||||
workspaceId: 'workspace-1',
|
||||
lane: 'runtime',
|
||||
queueName: 'workflow-execution',
|
||||
bullmqJobName: 'workflow-execution',
|
||||
bullmqPayload: {},
|
||||
metadata: {
|
||||
workflowId: 'workflow-1',
|
||||
},
|
||||
priority: 10,
|
||||
status: 'waiting',
|
||||
createdAt: 1000,
|
||||
admittedAt: 2000,
|
||||
it('returns pending status for a queued job', async () => {
|
||||
mockGetJobQueue.mockResolvedValue({
|
||||
getJob: vi.fn().mockResolvedValue({
|
||||
id: 'job-1',
|
||||
type: 'workflow-execution',
|
||||
payload: {},
|
||||
status: 'pending',
|
||||
createdAt: new Date('2025-01-01T00:00:00Z'),
|
||||
attempts: 0,
|
||||
maxAttempts: 1,
|
||||
metadata: {
|
||||
workflowId: 'workflow-1',
|
||||
},
|
||||
}),
|
||||
})
|
||||
|
||||
const response = await GET(createMockRequest(), {
|
||||
params: Promise.resolve({ jobId: 'dispatch-1' }),
|
||||
params: Promise.resolve({ jobId: 'job-1' }),
|
||||
})
|
||||
const body = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(body.status).toBe('waiting')
|
||||
expect(body.metadata.queueName).toBe('workflow-execution')
|
||||
expect(body.metadata.lane).toBe('runtime')
|
||||
expect(body.metadata.workspaceId).toBe('workspace-1')
|
||||
expect(body.status).toBe('pending')
|
||||
})
|
||||
|
||||
it('returns completed output from dispatch state', async () => {
|
||||
mockGetDispatchJobRecord.mockResolvedValue({
|
||||
id: 'dispatch-2',
|
||||
workspaceId: 'workspace-1',
|
||||
lane: 'interactive',
|
||||
queueName: 'workflow-execution',
|
||||
bullmqJobName: 'direct-workflow-execution',
|
||||
bullmqPayload: {},
|
||||
metadata: {
|
||||
workflowId: 'workflow-1',
|
||||
},
|
||||
priority: 1,
|
||||
status: 'completed',
|
||||
createdAt: 1000,
|
||||
startedAt: 2000,
|
||||
completedAt: 7000,
|
||||
output: { success: true },
|
||||
it('returns completed output from job', async () => {
|
||||
mockGetJobQueue.mockResolvedValue({
|
||||
getJob: vi.fn().mockResolvedValue({
|
||||
id: 'job-2',
|
||||
type: 'workflow-execution',
|
||||
payload: {},
|
||||
status: 'completed',
|
||||
createdAt: new Date('2025-01-01T00:00:00Z'),
|
||||
startedAt: new Date('2025-01-01T00:00:01Z'),
|
||||
completedAt: new Date('2025-01-01T00:00:06Z'),
|
||||
attempts: 1,
|
||||
maxAttempts: 1,
|
||||
output: { success: true },
|
||||
metadata: {
|
||||
workflowId: 'workflow-1',
|
||||
},
|
||||
}),
|
||||
})
|
||||
|
||||
const response = await GET(createMockRequest(), {
|
||||
params: Promise.resolve({ jobId: 'dispatch-2' }),
|
||||
params: Promise.resolve({ jobId: 'job-2' }),
|
||||
})
|
||||
const body = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(body.status).toBe('completed')
|
||||
expect(body.output).toEqual({ success: true })
|
||||
expect(body.metadata.duration).toBe(5000)
|
||||
})
|
||||
|
||||
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
|
||||
mockGetDispatchJobRecord.mockResolvedValue(null)
|
||||
|
||||
it('returns 404 when job does not exist', async () => {
|
||||
const response = await GET(createMockRequest(), {
|
||||
params: Promise.resolve({ jobId: 'missing-job' }),
|
||||
})
|
||||
|
||||
expect(response.status).toBe(404)
|
||||
})
|
||||
|
||||
it('returns 401 for unauthenticated requests', async () => {
|
||||
mockCheckHybridAuth.mockResolvedValue({
|
||||
success: false,
|
||||
error: 'Not authenticated',
|
||||
})
|
||||
|
||||
const response = await GET(createMockRequest(), {
|
||||
params: Promise.resolve({ jobId: 'job-1' }),
|
||||
})
|
||||
|
||||
expect(response.status).toBe(401)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -2,13 +2,27 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getJobQueue } from '@/lib/core/async-jobs'
|
||||
import type { Job } from '@/lib/core/async-jobs/types'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
|
||||
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
|
||||
import { createErrorResponse } from '@/app/api/workflows/utils'
|
||||
|
||||
const logger = createLogger('TaskStatusAPI')
|
||||
|
||||
function presentJobStatus(job: Job) {
|
||||
return {
|
||||
status: job.status,
|
||||
metadata: {
|
||||
createdAt: job.createdAt.toISOString(),
|
||||
startedAt: job.startedAt?.toISOString(),
|
||||
completedAt: job.completedAt?.toISOString(),
|
||||
attempts: job.attempts,
|
||||
maxAttempts: job.maxAttempts,
|
||||
},
|
||||
output: job.output,
|
||||
error: job.error,
|
||||
}
|
||||
}
|
||||
|
||||
export async function GET(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ jobId: string }> }
|
||||
@@ -25,15 +39,14 @@ export async function GET(
|
||||
|
||||
const authenticatedUserId = authResult.userId
|
||||
|
||||
const dispatchJob = await getDispatchJobRecord(taskId)
|
||||
const jobQueue = await getJobQueue()
|
||||
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
|
||||
const job = await jobQueue.getJob(taskId)
|
||||
|
||||
if (!job && !dispatchJob) {
|
||||
if (!job) {
|
||||
return createErrorResponse('Task not found', 404)
|
||||
}
|
||||
|
||||
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
|
||||
const metadataToCheck = job.metadata
|
||||
|
||||
if (metadataToCheck?.workflowId) {
|
||||
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
|
||||
@@ -61,7 +74,7 @@ export async function GET(
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
|
||||
const presented = presentDispatchOrJobStatus(dispatchJob, job)
|
||||
const presented = presentJobStatus(job)
|
||||
const response: any = {
|
||||
success: true,
|
||||
taskId,
|
||||
@@ -71,9 +84,6 @@ export async function GET(
|
||||
|
||||
if (presented.output !== undefined) response.output = presented.output
|
||||
if (presented.error !== undefined) response.error = presented.error
|
||||
if (presented.estimatedDuration !== undefined) {
|
||||
response.estimatedDuration = presented.estimatedDuration
|
||||
}
|
||||
|
||||
return NextResponse.json(response)
|
||||
} catch (error: any) {
|
||||
|
||||
@@ -16,7 +16,6 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { deleteDocumentStorageFiles } from '@/lib/knowledge/documents/service'
|
||||
import { cleanupUnusedTagDefinitions } from '@/lib/knowledge/tags/service'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import { checkKnowledgeBaseAccess, checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
|
||||
import { CONNECTOR_REGISTRY } from '@/connectors/registry'
|
||||
@@ -352,19 +351,6 @@ export async function DELETE(request: NextRequest, { params }: RouteParams) {
|
||||
`[${requestId}] Deleted connector ${connectorId}${deleteDocuments ? ` and ${docCount} documents` : `, kept ${docCount} documents`}`
|
||||
)
|
||||
|
||||
const kbWorkspaceId = writeCheck.knowledgeBase.workspaceId ?? ''
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'knowledge_base_connector_removed',
|
||||
{
|
||||
knowledge_base_id: knowledgeBaseId,
|
||||
workspace_id: kbWorkspaceId,
|
||||
connector_type: existingConnector[0].connectorType,
|
||||
documents_deleted: deleteDocuments ? docCount : 0,
|
||||
},
|
||||
kbWorkspaceId ? { groups: { workspace: kbWorkspaceId } } : undefined
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: writeCheck.knowledgeBase.workspaceId,
|
||||
actorId: auth.userId,
|
||||
|
||||
@@ -7,7 +7,6 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { dispatchSync } from '@/lib/knowledge/connectors/sync-engine'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
|
||||
|
||||
const logger = createLogger('ConnectorManualSyncAPI')
|
||||
@@ -56,18 +55,6 @@ export async function POST(request: NextRequest, { params }: RouteParams) {
|
||||
|
||||
logger.info(`[${requestId}] Manual sync triggered for connector ${connectorId}`)
|
||||
|
||||
const kbWorkspaceId = writeCheck.knowledgeBase.workspaceId ?? ''
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'knowledge_base_connector_synced',
|
||||
{
|
||||
knowledge_base_id: knowledgeBaseId,
|
||||
workspace_id: kbWorkspaceId,
|
||||
connector_type: connectorRows[0].connectorType,
|
||||
},
|
||||
kbWorkspaceId ? { groups: { workspace: kbWorkspaceId } } : undefined
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: writeCheck.knowledgeBase.workspaceId,
|
||||
actorId: auth.userId,
|
||||
|
||||
@@ -11,7 +11,6 @@ import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { dispatchSync } from '@/lib/knowledge/connectors/sync-engine'
|
||||
import { allocateTagSlots } from '@/lib/knowledge/constants'
|
||||
import { createTagDefinition } from '@/lib/knowledge/tags/service'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getCredential } from '@/app/api/auth/oauth/utils'
|
||||
import { checkKnowledgeBaseAccess, checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
|
||||
import { CONNECTOR_REGISTRY } from '@/connectors/registry'
|
||||
@@ -228,22 +227,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
|
||||
logger.info(`[${requestId}] Created connector ${connectorId} for KB ${knowledgeBaseId}`)
|
||||
|
||||
const kbWorkspaceId = writeCheck.knowledgeBase.workspaceId ?? ''
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'knowledge_base_connector_added',
|
||||
{
|
||||
knowledge_base_id: knowledgeBaseId,
|
||||
workspace_id: kbWorkspaceId,
|
||||
connector_type: connectorType,
|
||||
sync_interval_minutes: syncIntervalMinutes,
|
||||
},
|
||||
{
|
||||
groups: kbWorkspaceId ? { workspace: kbWorkspaceId } : undefined,
|
||||
setOnce: { first_connector_added_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: writeCheck.knowledgeBase.workspaceId,
|
||||
actorId: auth.userId,
|
||||
|
||||
@@ -16,7 +16,6 @@ import {
|
||||
type TagFilterCondition,
|
||||
} from '@/lib/knowledge/documents/service'
|
||||
import type { DocumentSortField, SortOrder } from '@/lib/knowledge/documents/types'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { checkKnowledgeBaseAccess, checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
|
||||
|
||||
@@ -215,8 +214,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const kbWorkspaceId = accessCheck.knowledgeBase?.workspaceId
|
||||
|
||||
if (body.bulk === true) {
|
||||
try {
|
||||
const validatedData = BulkCreateDocumentsSchema.parse(body)
|
||||
@@ -243,21 +240,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
// Silently fail
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'knowledge_base_document_uploaded',
|
||||
{
|
||||
knowledge_base_id: knowledgeBaseId,
|
||||
workspace_id: kbWorkspaceId ?? '',
|
||||
document_count: createdDocuments.length,
|
||||
upload_type: 'bulk',
|
||||
},
|
||||
{
|
||||
...(kbWorkspaceId ? { groups: { workspace: kbWorkspaceId } } : {}),
|
||||
setOnce: { first_document_uploaded_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
processDocumentsWithQueue(
|
||||
createdDocuments,
|
||||
knowledgeBaseId,
|
||||
@@ -332,21 +314,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
// Silently fail
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'knowledge_base_document_uploaded',
|
||||
{
|
||||
knowledge_base_id: knowledgeBaseId,
|
||||
workspace_id: kbWorkspaceId ?? '',
|
||||
document_count: 1,
|
||||
upload_type: 'single',
|
||||
},
|
||||
{
|
||||
...(kbWorkspaceId ? { groups: { workspace: kbWorkspaceId } } : {}),
|
||||
setOnce: { first_document_uploaded_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: accessCheck.knowledgeBase?.workspaceId ?? null,
|
||||
actorId: userId,
|
||||
|
||||
@@ -11,7 +11,6 @@ import {
|
||||
KnowledgeBaseConflictError,
|
||||
type KnowledgeBaseScope,
|
||||
} from '@/lib/knowledge/service'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
|
||||
const logger = createLogger('KnowledgeBaseAPI')
|
||||
|
||||
@@ -116,20 +115,6 @@ export async function POST(req: NextRequest) {
|
||||
// Telemetry should not fail the operation
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'knowledge_base_created',
|
||||
{
|
||||
knowledge_base_id: newKnowledgeBase.id,
|
||||
workspace_id: validatedData.workspaceId,
|
||||
name: validatedData.name,
|
||||
},
|
||||
{
|
||||
groups: { workspace: validatedData.workspaceId },
|
||||
setOnce: { first_kb_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Knowledge base created: ${newKnowledgeBase.id} for user ${session.user.id}`
|
||||
)
|
||||
|
||||
@@ -18,14 +18,11 @@ import { eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
|
||||
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
|
||||
import {
|
||||
executeToolServerSide,
|
||||
prepareExecutionContext,
|
||||
} from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
|
||||
import { orchestrateSubagentStream } from '@/lib/copilot/request/subagent'
|
||||
import { ensureHandlersRegistered, executeTool } from '@/lib/copilot/tool-executor'
|
||||
import { prepareExecutionContext } from '@/lib/copilot/tools/handlers/context'
|
||||
import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/definitions'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter'
|
||||
@@ -645,7 +642,8 @@ async function handleDirectToolCall(
|
||||
startTime: Date.now(),
|
||||
}
|
||||
|
||||
const result = await executeToolServerSide(toolCall, execContext)
|
||||
ensureHandlersRegistered()
|
||||
const result = await executeTool(toolCall.name, toolCall.params || {}, execContext)
|
||||
|
||||
return {
|
||||
content: [
|
||||
@@ -728,25 +726,10 @@ async function handleBuildToolCall(
|
||||
chatId,
|
||||
}
|
||||
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
const messageId = requestPayload.messageId as string
|
||||
|
||||
await createRunSegment({
|
||||
id: runId,
|
||||
executionId,
|
||||
chatId,
|
||||
userId,
|
||||
workflowId: resolved.workflowId,
|
||||
streamId: messageId,
|
||||
}).catch(() => {})
|
||||
|
||||
const result = await orchestrateCopilotStream(requestPayload, {
|
||||
const result = await runCopilotLifecycle(requestPayload, {
|
||||
userId,
|
||||
workflowId: resolved.workflowId,
|
||||
chatId,
|
||||
executionId,
|
||||
runId,
|
||||
goRoute: '/api/mcp',
|
||||
autoExecuteTools: true,
|
||||
timeout: ORCHESTRATION_TIMEOUT_MS,
|
||||
|
||||
@@ -18,7 +18,6 @@ import {
|
||||
createMcpSuccessResponse,
|
||||
generateMcpServerId,
|
||||
} from '@/lib/mcp/utils'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
|
||||
const logger = createLogger('McpServersAPI')
|
||||
|
||||
@@ -181,20 +180,6 @@ export const POST = withMcpAuth('write')(
|
||||
// Silently fail
|
||||
}
|
||||
|
||||
const sourceParam = body.source as string | undefined
|
||||
const source =
|
||||
sourceParam === 'settings' || sourceParam === 'tool_input' ? sourceParam : undefined
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'mcp_server_connected',
|
||||
{ workspace_id: workspaceId, server_name: body.name, transport: body.transport, source },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_mcp_connected_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
@@ -229,9 +214,6 @@ export const DELETE = withMcpAuth('admin')(
|
||||
try {
|
||||
const { searchParams } = new URL(request.url)
|
||||
const serverId = searchParams.get('serverId')
|
||||
const sourceParam = searchParams.get('source')
|
||||
const source =
|
||||
sourceParam === 'settings' || sourceParam === 'tool_input' ? sourceParam : undefined
|
||||
|
||||
if (!serverId) {
|
||||
return createMcpErrorResponse(
|
||||
@@ -260,13 +242,6 @@ export const DELETE = withMcpAuth('admin')(
|
||||
|
||||
logger.info(`[${requestId}] Successfully deleted MCP server: ${serverId}`)
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'mcp_server_disconnected',
|
||||
{ workspace_id: workspaceId, server_name: deletedServer.name, source },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
|
||||
@@ -5,18 +5,26 @@ import { eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload'
|
||||
import {
|
||||
buildPersistedAssistantMessage,
|
||||
buildPersistedUserMessage,
|
||||
} from '@/lib/copilot/chat/persisted-message'
|
||||
import {
|
||||
processContextsServer,
|
||||
resolveActiveResourceContext,
|
||||
} from '@/lib/copilot/chat/process-contents'
|
||||
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
|
||||
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request/http'
|
||||
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start'
|
||||
import {
|
||||
acquirePendingChatStream,
|
||||
createSSEStream,
|
||||
SSE_RESPONSE_HEADERS,
|
||||
} from '@/lib/copilot/chat-streaming'
|
||||
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
|
||||
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { generateWorkspaceContext } from '@/lib/copilot/workspace-context'
|
||||
getPendingChatStreamId,
|
||||
releasePendingChatStream,
|
||||
} from '@/lib/copilot/request/session'
|
||||
import type { OrchestratorResult } from '@/lib/copilot/request/types'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
import {
|
||||
assertActiveWorkspaceAccess,
|
||||
getUserEntityPermissions,
|
||||
@@ -37,7 +45,6 @@ const FileAttachmentSchema = z.object({
|
||||
const ResourceAttachmentSchema = z.object({
|
||||
type: z.enum(['workflow', 'table', 'file', 'knowledgebase']),
|
||||
id: z.string().min(1),
|
||||
title: z.string().optional(),
|
||||
active: z.boolean().optional(),
|
||||
})
|
||||
|
||||
@@ -87,7 +94,9 @@ const MothershipMessageSchema = z.object({
|
||||
*/
|
||||
export async function POST(req: NextRequest) {
|
||||
const tracker = createRequestTracker()
|
||||
let userMessageIdForLogs: string | undefined
|
||||
let lockChatId: string | undefined
|
||||
let lockStreamId = ''
|
||||
let chatStreamLockAcquired = false
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
@@ -110,27 +119,23 @@ export async function POST(req: NextRequest) {
|
||||
} = MothershipMessageSchema.parse(body)
|
||||
|
||||
const userMessageId = providedMessageId || crypto.randomUUID()
|
||||
userMessageIdForLogs = userMessageId
|
||||
const reqLogger = logger.withMetadata({
|
||||
requestId: tracker.requestId,
|
||||
messageId: userMessageId,
|
||||
})
|
||||
lockStreamId = userMessageId
|
||||
|
||||
reqLogger.info('Received mothership chat start request', {
|
||||
workspaceId,
|
||||
chatId,
|
||||
createNewChat,
|
||||
hasContexts: Array.isArray(contexts) && contexts.length > 0,
|
||||
contextsCount: Array.isArray(contexts) ? contexts.length : 0,
|
||||
hasResourceAttachments: Array.isArray(resourceAttachments) && resourceAttachments.length > 0,
|
||||
resourceAttachmentCount: Array.isArray(resourceAttachments) ? resourceAttachments.length : 0,
|
||||
hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0,
|
||||
fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0,
|
||||
})
|
||||
// Phase 1: workspace access + chat resolution in parallel
|
||||
const [accessResult, chatResult] = await Promise.allSettled([
|
||||
assertActiveWorkspaceAccess(workspaceId, authenticatedUserId),
|
||||
chatId || createNewChat
|
||||
? resolveOrCreateChat({
|
||||
chatId,
|
||||
userId: authenticatedUserId,
|
||||
workspaceId,
|
||||
model: 'claude-opus-4-6',
|
||||
type: 'mothership',
|
||||
})
|
||||
: null,
|
||||
])
|
||||
|
||||
try {
|
||||
await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId)
|
||||
} catch {
|
||||
if (accessResult.status === 'rejected') {
|
||||
return NextResponse.json({ error: 'Workspace not found or access denied' }, { status: 403 })
|
||||
}
|
||||
|
||||
@@ -138,18 +143,12 @@ export async function POST(req: NextRequest) {
|
||||
let conversationHistory: any[] = []
|
||||
let actualChatId = chatId
|
||||
|
||||
if (chatId || createNewChat) {
|
||||
const chatResult = await resolveOrCreateChat({
|
||||
chatId,
|
||||
userId: authenticatedUserId,
|
||||
workspaceId,
|
||||
model: 'claude-opus-4-6',
|
||||
type: 'mothership',
|
||||
})
|
||||
currentChat = chatResult.chat
|
||||
actualChatId = chatResult.chatId || chatId
|
||||
conversationHistory = Array.isArray(chatResult.conversationHistory)
|
||||
? chatResult.conversationHistory
|
||||
if (chatResult.status === 'fulfilled' && chatResult.value) {
|
||||
const resolved = chatResult.value
|
||||
currentChat = resolved.chat
|
||||
actualChatId = resolved.chatId || chatId
|
||||
conversationHistory = Array.isArray(resolved.conversationHistory)
|
||||
? resolved.conversationHistory
|
||||
: []
|
||||
|
||||
if (chatId && !currentChat) {
|
||||
@@ -157,76 +156,73 @@ export async function POST(req: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
let agentContexts: Array<{ type: string; content: string }> = []
|
||||
if (Array.isArray(contexts) && contexts.length > 0) {
|
||||
try {
|
||||
agentContexts = await processContextsServer(
|
||||
contexts as any,
|
||||
authenticatedUserId,
|
||||
message,
|
||||
workspaceId,
|
||||
actualChatId
|
||||
if (actualChatId) {
|
||||
chatStreamLockAcquired = await acquirePendingChatStream(actualChatId, userMessageId)
|
||||
if (!chatStreamLockAcquired) {
|
||||
const activeStreamId = await getPendingChatStreamId(actualChatId)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'A response is already in progress for this chat.',
|
||||
...(activeStreamId ? { activeStreamId } : {}),
|
||||
},
|
||||
{ status: 409 }
|
||||
)
|
||||
} catch (e) {
|
||||
reqLogger.error('Failed to process contexts', e)
|
||||
}
|
||||
lockChatId = actualChatId
|
||||
}
|
||||
|
||||
if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0) {
|
||||
const results = await Promise.allSettled(
|
||||
resourceAttachments.map(async (r) => {
|
||||
const ctx = await resolveActiveResourceContext(
|
||||
r.type,
|
||||
r.id,
|
||||
workspaceId,
|
||||
// Phase 2: contexts + workspace context + user message persistence in parallel
|
||||
const contextPromise = (async () => {
|
||||
let agentCtxs: Array<{ type: string; content: string }> = []
|
||||
if (Array.isArray(contexts) && contexts.length > 0) {
|
||||
try {
|
||||
agentCtxs = await processContextsServer(
|
||||
contexts as any,
|
||||
authenticatedUserId,
|
||||
message,
|
||||
workspaceId,
|
||||
actualChatId
|
||||
)
|
||||
if (!ctx) return null
|
||||
return {
|
||||
...ctx,
|
||||
tag: r.active ? '@active_tab' : '@open_tab',
|
||||
}
|
||||
})
|
||||
)
|
||||
for (const result of results) {
|
||||
if (result.status === 'fulfilled' && result.value) {
|
||||
agentContexts.push(result.value)
|
||||
} else if (result.status === 'rejected') {
|
||||
reqLogger.error('Failed to resolve resource attachment', result.reason)
|
||||
} catch (e) {
|
||||
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (actualChatId) {
|
||||
const userMsg = {
|
||||
id: userMessageId,
|
||||
role: 'user' as const,
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(fileAttachments &&
|
||||
fileAttachments.length > 0 && {
|
||||
fileAttachments: fileAttachments.map((f) => ({
|
||||
id: f.id,
|
||||
key: f.key,
|
||||
filename: f.filename,
|
||||
media_type: f.media_type,
|
||||
size: f.size,
|
||||
})),
|
||||
}),
|
||||
...(contexts &&
|
||||
contexts.length > 0 && {
|
||||
contexts: contexts.map((c) => ({
|
||||
kind: c.kind,
|
||||
label: c.label,
|
||||
...(c.workflowId && { workflowId: c.workflowId }),
|
||||
...(c.knowledgeId && { knowledgeId: c.knowledgeId }),
|
||||
...(c.tableId && { tableId: c.tableId }),
|
||||
...(c.fileId && { fileId: c.fileId }),
|
||||
})),
|
||||
}),
|
||||
if (Array.isArray(resourceAttachments) && resourceAttachments.length > 0) {
|
||||
const results = await Promise.allSettled(
|
||||
resourceAttachments.map(async (r) => {
|
||||
const ctx = await resolveActiveResourceContext(
|
||||
r.type,
|
||||
r.id,
|
||||
workspaceId,
|
||||
authenticatedUserId,
|
||||
actualChatId
|
||||
)
|
||||
if (!ctx) return null
|
||||
return { ...ctx, tag: r.active ? '@active_tab' : '@open_tab' }
|
||||
})
|
||||
)
|
||||
for (const result of results) {
|
||||
if (result.status === 'fulfilled' && result.value) {
|
||||
agentCtxs.push(result.value)
|
||||
} else if (result.status === 'rejected') {
|
||||
logger.error(
|
||||
`[${tracker.requestId}] Failed to resolve resource attachment`,
|
||||
result.reason
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
return agentCtxs
|
||||
})()
|
||||
|
||||
const userMsgPromise = (async () => {
|
||||
if (!actualChatId) return
|
||||
const userMsg = buildPersistedUserMessage({
|
||||
id: userMessageId,
|
||||
content: message,
|
||||
fileAttachments,
|
||||
contexts,
|
||||
})
|
||||
const [updated] = await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
@@ -242,11 +238,15 @@ export async function POST(req: NextRequest) {
|
||||
conversationHistory = freshMessages.filter((m: any) => m.id !== userMessageId)
|
||||
taskPubSub?.publishStatusChanged({ workspaceId, chatId: actualChatId, type: 'started' })
|
||||
}
|
||||
}
|
||||
})()
|
||||
|
||||
const [workspaceContext, userPermission] = await Promise.all([
|
||||
generateWorkspaceContext(workspaceId, authenticatedUserId),
|
||||
getUserEntityPermissions(authenticatedUserId, 'workspace', workspaceId).catch(() => null),
|
||||
const [agentContexts, [workspaceContext, userPermission]] = await Promise.all([
|
||||
contextPromise,
|
||||
Promise.all([
|
||||
generateWorkspaceContext(workspaceId, authenticatedUserId),
|
||||
getUserEntityPermissions(authenticatedUserId, 'workspace', workspaceId).catch(() => null),
|
||||
]),
|
||||
userMsgPromise,
|
||||
])
|
||||
|
||||
const requestPayload = await buildCopilotRequestPayload(
|
||||
@@ -267,19 +267,6 @@ export async function POST(req: NextRequest) {
|
||||
{ selectedModel: '' }
|
||||
)
|
||||
|
||||
if (actualChatId) {
|
||||
const acquired = await acquirePendingChatStream(actualChatId, userMessageId)
|
||||
if (!acquired) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error:
|
||||
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
|
||||
},
|
||||
{ status: 409 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
const stream = createSSEStream({
|
||||
@@ -295,7 +282,6 @@ export async function POST(req: NextRequest) {
|
||||
titleModel: 'claude-opus-4-6',
|
||||
requestId: tracker.requestId,
|
||||
workspaceId,
|
||||
pendingChatStreamAlreadyRegistered: Boolean(actualChatId),
|
||||
orchestrateOptions: {
|
||||
userId: authenticatedUserId,
|
||||
workspaceId,
|
||||
@@ -309,46 +295,7 @@ export async function POST(req: NextRequest) {
|
||||
if (!actualChatId) return
|
||||
if (!result.success) return
|
||||
|
||||
const assistantMessage: Record<string, unknown> = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant' as const,
|
||||
content: result.content,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(result.requestId ? { requestId: result.requestId } : {}),
|
||||
}
|
||||
if (result.toolCalls.length > 0) {
|
||||
assistantMessage.toolCalls = result.toolCalls
|
||||
}
|
||||
if (result.contentBlocks.length > 0) {
|
||||
assistantMessage.contentBlocks = result.contentBlocks.map((block) => {
|
||||
const stored: Record<string, unknown> = { type: block.type }
|
||||
if (block.content) stored.content = block.content
|
||||
if (block.type === 'tool_call' && block.toolCall) {
|
||||
const state =
|
||||
block.toolCall.result?.success !== undefined
|
||||
? block.toolCall.result.success
|
||||
? 'success'
|
||||
: 'error'
|
||||
: block.toolCall.status
|
||||
const isSubagentTool = !!block.calledBy
|
||||
const isNonTerminal =
|
||||
state === 'cancelled' || state === 'pending' || state === 'executing'
|
||||
stored.toolCall = {
|
||||
id: block.toolCall.id,
|
||||
name: block.toolCall.name,
|
||||
state,
|
||||
...(isSubagentTool && isNonTerminal ? {} : { result: block.toolCall.result }),
|
||||
...(isSubagentTool && isNonTerminal
|
||||
? {}
|
||||
: block.toolCall.params
|
||||
? { params: block.toolCall.params }
|
||||
: {}),
|
||||
...(block.calledBy ? { calledBy: block.calledBy } : {}),
|
||||
}
|
||||
}
|
||||
return stored
|
||||
})
|
||||
}
|
||||
const assistantMessage = buildPersistedAssistantMessage(result, result.requestId)
|
||||
|
||||
try {
|
||||
const [row] = await db
|
||||
@@ -381,7 +328,7 @@ export async function POST(req: NextRequest) {
|
||||
})
|
||||
}
|
||||
} catch (error) {
|
||||
reqLogger.error('Failed to persist chat messages', {
|
||||
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
|
||||
chatId: actualChatId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
@@ -392,6 +339,9 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
return new Response(stream, { headers: SSE_RESPONSE_HEADERS })
|
||||
} catch (error) {
|
||||
if (chatStreamLockAcquired && lockChatId && lockStreamId) {
|
||||
await releasePendingChatStream(lockChatId, lockStreamId)
|
||||
}
|
||||
if (error instanceof z.ZodError) {
|
||||
return NextResponse.json(
|
||||
{ error: 'Invalid request data', details: error.errors },
|
||||
@@ -399,11 +349,9 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
logger
|
||||
.withMetadata({ requestId: tracker.requestId, messageId: userMessageIdForLogs })
|
||||
.error('Error handling mothership chat', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
logger.error(`[${tracker.requestId}] Error handling mothership chat:`, {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Internal server error' },
|
||||
|
||||
@@ -5,8 +5,9 @@ import { and, eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { releasePendingChatStream } from '@/lib/copilot/chat-streaming'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message'
|
||||
import { releasePendingChatStream } from '@/lib/copilot/request/session'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
|
||||
const logger = createLogger('MothershipChatStopAPI')
|
||||
|
||||
@@ -26,15 +27,25 @@ const StoredToolCallSchema = z
|
||||
display: z
|
||||
.object({
|
||||
text: z.string().optional(),
|
||||
title: z.string().optional(),
|
||||
phaseLabel: z.string().optional(),
|
||||
})
|
||||
.optional(),
|
||||
calledBy: z.string().optional(),
|
||||
durationMs: z.number().optional(),
|
||||
error: z.string().optional(),
|
||||
})
|
||||
.nullable()
|
||||
|
||||
const ContentBlockSchema = z.object({
|
||||
type: z.string(),
|
||||
lane: z.enum(['main', 'subagent']).optional(),
|
||||
content: z.string().optional(),
|
||||
channel: z.enum(['assistant', 'thinking']).optional(),
|
||||
phase: z.enum(['call', 'args_delta', 'result']).optional(),
|
||||
kind: z.enum(['subagent', 'structured_result', 'subagent_result']).optional(),
|
||||
lifecycle: z.enum(['start', 'end']).optional(),
|
||||
status: z.enum(['complete', 'error', 'cancelled']).optional(),
|
||||
toolCall: StoredToolCallSchema.optional(),
|
||||
})
|
||||
|
||||
@@ -70,15 +81,14 @@ export async function POST(req: NextRequest) {
|
||||
const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0
|
||||
|
||||
if (hasContent || hasBlocks) {
|
||||
const assistantMessage: Record<string, unknown> = {
|
||||
const normalized = normalizeMessage({
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant' as const,
|
||||
role: 'assistant',
|
||||
content,
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
if (hasBlocks) {
|
||||
assistantMessage.contentBlocks = contentBlocks
|
||||
}
|
||||
...(hasBlocks ? { contentBlocks } : {}),
|
||||
})
|
||||
const assistantMessage: PersistedMessage = normalized
|
||||
setClause.messages = sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`
|
||||
}
|
||||
|
||||
|
||||
@@ -4,16 +4,15 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
|
||||
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { readEvents } from '@/lib/copilot/request/session/buffer'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
|
||||
const logger = createLogger('MothershipChatAPI')
|
||||
|
||||
@@ -47,29 +46,24 @@ export async function GET(
|
||||
}
|
||||
|
||||
let streamSnapshot: {
|
||||
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
|
||||
events: unknown[]
|
||||
status: string
|
||||
} | null = null
|
||||
|
||||
if (chat.conversationId) {
|
||||
try {
|
||||
const [meta, events] = await Promise.all([
|
||||
getStreamMeta(chat.conversationId),
|
||||
readStreamEvents(chat.conversationId, 0),
|
||||
])
|
||||
const events = await readEvents(chat.conversationId, '0')
|
||||
|
||||
streamSnapshot = {
|
||||
events: events || [],
|
||||
status: meta?.status || 'unknown',
|
||||
status: events.length > 0 ? 'active' : 'unknown',
|
||||
}
|
||||
} catch (error) {
|
||||
logger
|
||||
.withMetadata({ messageId: chat.conversationId || undefined })
|
||||
.warn('Failed to read stream snapshot for mothership chat', {
|
||||
chatId,
|
||||
conversationId: chat.conversationId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
logger.warn('Failed to read stream snapshot for mothership chat', {
|
||||
chatId,
|
||||
conversationId: chat.conversationId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,41 +137,12 @@ export async function PATCH(
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
if (updatedChat.workspaceId) {
|
||||
if (title !== undefined) {
|
||||
taskPubSub?.publishStatusChanged({
|
||||
workspaceId: updatedChat.workspaceId,
|
||||
chatId,
|
||||
type: 'renamed',
|
||||
})
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'task_renamed',
|
||||
{ workspace_id: updatedChat.workspaceId },
|
||||
{
|
||||
groups: { workspace: updatedChat.workspaceId },
|
||||
}
|
||||
)
|
||||
}
|
||||
if (isUnread === false) {
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'task_marked_read',
|
||||
{ workspace_id: updatedChat.workspaceId },
|
||||
{
|
||||
groups: { workspace: updatedChat.workspaceId },
|
||||
}
|
||||
)
|
||||
} else if (isUnread === true) {
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'task_marked_unread',
|
||||
{ workspace_id: updatedChat.workspaceId },
|
||||
{
|
||||
groups: { workspace: updatedChat.workspaceId },
|
||||
}
|
||||
)
|
||||
}
|
||||
if (title !== undefined && updatedChat.workspaceId) {
|
||||
taskPubSub?.publishStatusChanged({
|
||||
workspaceId: updatedChat.workspaceId,
|
||||
chatId,
|
||||
type: 'renamed',
|
||||
})
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
@@ -233,14 +198,6 @@ export async function DELETE(
|
||||
chatId,
|
||||
type: 'deleted',
|
||||
})
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'task_deleted',
|
||||
{ workspace_id: deletedChat.workspaceId },
|
||||
{
|
||||
groups: { workspace: deletedChat.workspaceId },
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
|
||||
43
apps/sim/app/api/mothership/chats/read/route.ts
Normal file
43
apps/sim/app/api/mothership/chats/read/route.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request/http'
|
||||
|
||||
const logger = createLogger('MarkTaskReadAPI')
|
||||
|
||||
const MarkReadSchema = z.object({
|
||||
chatId: z.string().min(1),
|
||||
})
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly()
|
||||
if (!isAuthenticated || !userId) {
|
||||
return createUnauthorizedResponse()
|
||||
}
|
||||
|
||||
const body = await request.json()
|
||||
const { chatId } = MarkReadSchema.parse(body)
|
||||
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ lastSeenAt: sql`GREATEST(${copilotChats.updatedAt}, NOW())` })
|
||||
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
if (error instanceof z.ZodError) {
|
||||
return createBadRequestResponse('chatId is required')
|
||||
}
|
||||
logger.error('Error marking task as read:', error)
|
||||
return createInternalServerErrorResponse('Failed to mark task as read')
|
||||
}
|
||||
}
|
||||
@@ -9,9 +9,8 @@ import {
|
||||
createBadRequestResponse,
|
||||
createInternalServerErrorResponse,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
} from '@/lib/copilot/request/http'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('MothershipChatsAPI')
|
||||
@@ -39,7 +38,7 @@ export async function GET(request: NextRequest) {
|
||||
id: copilotChats.id,
|
||||
title: copilotChats.title,
|
||||
updatedAt: copilotChats.updatedAt,
|
||||
conversationId: copilotChats.conversationId,
|
||||
activeStreamId: copilotChats.conversationId,
|
||||
lastSeenAt: copilotChats.lastSeenAt,
|
||||
})
|
||||
.from(copilotChats)
|
||||
@@ -96,15 +95,6 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
taskPubSub?.publishStatusChanged({ workspaceId, chatId: chat.id, type: 'created' })
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'task_created',
|
||||
{ workspace_id: workspaceId },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
}
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true, id: chat.id })
|
||||
} catch (error) {
|
||||
if (error instanceof z.ZodError) {
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
* Auth is handled via session cookies (EventSource sends cookies automatically).
|
||||
*/
|
||||
|
||||
import { taskPubSub } from '@/lib/copilot/task-events'
|
||||
import { taskPubSub } from '@/lib/copilot/tasks'
|
||||
import { createWorkspaceSSE } from '@/lib/events/sse-endpoint'
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
@@ -2,10 +2,9 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
|
||||
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { generateWorkspaceContext } from '@/lib/copilot/workspace-context'
|
||||
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
|
||||
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
|
||||
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
|
||||
import {
|
||||
assertActiveWorkspaceAccess,
|
||||
getUserEntityPermissions,
|
||||
@@ -72,34 +71,25 @@ export async function POST(req: NextRequest) {
|
||||
...(userPermission ? { userPermission } : {}),
|
||||
}
|
||||
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
|
||||
await createRunSegment({
|
||||
id: runId,
|
||||
executionId,
|
||||
chatId: effectiveChatId,
|
||||
userId,
|
||||
workspaceId,
|
||||
streamId: messageId,
|
||||
}).catch(() => {})
|
||||
|
||||
const result = await orchestrateCopilotStream(requestPayload, {
|
||||
const result = await runCopilotLifecycle(requestPayload, {
|
||||
userId,
|
||||
workspaceId,
|
||||
chatId: effectiveChatId,
|
||||
executionId,
|
||||
runId,
|
||||
goRoute: '/api/mothership/execute',
|
||||
autoExecuteTools: true,
|
||||
interactive: false,
|
||||
})
|
||||
|
||||
if (!result.success) {
|
||||
reqLogger.error('Mothership execute failed', {
|
||||
error: result.error,
|
||||
errors: result.errors,
|
||||
})
|
||||
logger.error(
|
||||
messageId
|
||||
? `Mothership execute failed [messageId:${messageId}]`
|
||||
: 'Mothership execute failed',
|
||||
{
|
||||
error: result.error,
|
||||
errors: result.errors,
|
||||
}
|
||||
)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: result.error || 'Mothership execution failed',
|
||||
@@ -135,9 +125,12 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
logger.withMetadata({ messageId }).error('Mothership execute error', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
logger.error(
|
||||
messageId ? `Mothership execute error [messageId:${messageId}]` : 'Mothership execute error',
|
||||
{
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
}
|
||||
)
|
||||
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Internal server error' },
|
||||
|
||||
@@ -14,7 +14,6 @@ const {
|
||||
mockDbReturning,
|
||||
mockDbUpdate,
|
||||
mockEnqueue,
|
||||
mockEnqueueWorkspaceDispatch,
|
||||
mockStartJob,
|
||||
mockCompleteJob,
|
||||
mockMarkJobFailed,
|
||||
@@ -24,7 +23,6 @@ const {
|
||||
const mockDbSet = vi.fn().mockReturnValue({ where: mockDbWhere })
|
||||
const mockDbUpdate = vi.fn().mockReturnValue({ set: mockDbSet })
|
||||
const mockEnqueue = vi.fn().mockResolvedValue('job-id-1')
|
||||
const mockEnqueueWorkspaceDispatch = vi.fn().mockResolvedValue('job-id-1')
|
||||
const mockStartJob = vi.fn().mockResolvedValue(undefined)
|
||||
const mockCompleteJob = vi.fn().mockResolvedValue(undefined)
|
||||
const mockMarkJobFailed = vi.fn().mockResolvedValue(undefined)
|
||||
@@ -42,7 +40,6 @@ const {
|
||||
mockDbReturning,
|
||||
mockDbUpdate,
|
||||
mockEnqueue,
|
||||
mockEnqueueWorkspaceDispatch,
|
||||
mockStartJob,
|
||||
mockCompleteJob,
|
||||
mockMarkJobFailed,
|
||||
@@ -75,15 +72,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
|
||||
shouldExecuteInline: vi.fn().mockReturnValue(false),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/bullmq', () => ({
|
||||
isBullMQEnabled: vi.fn().mockReturnValue(true),
|
||||
createBullMQJobData: vi.fn((payload: unknown) => ({ payload })),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/workspace-dispatch', () => ({
|
||||
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/workflows/utils', () => ({
|
||||
getWorkflowById: vi.fn().mockResolvedValue({
|
||||
id: 'workflow-1',
|
||||
@@ -246,29 +234,19 @@ describe('Scheduled Workflow Execution API Route', () => {
|
||||
expect(data).toHaveProperty('executedCount', 2)
|
||||
})
|
||||
|
||||
it('should queue mothership jobs to BullMQ when available', async () => {
|
||||
it('should execute mothership jobs inline', async () => {
|
||||
mockDbReturning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
|
||||
|
||||
const response = await GET(createMockRequest())
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
|
||||
expect(mockExecuteJobInline).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
workspaceId: 'workspace-1',
|
||||
lane: 'runtime',
|
||||
queueName: 'mothership-job-execution',
|
||||
bullmqJobName: 'mothership-job-execution',
|
||||
bullmqPayload: {
|
||||
payload: {
|
||||
scheduleId: 'job-1',
|
||||
cronExpression: '0 * * * *',
|
||||
failedCount: 0,
|
||||
now: expect.any(String),
|
||||
},
|
||||
},
|
||||
scheduleId: 'job-1',
|
||||
cronExpression: '0 * * * *',
|
||||
failedCount: 0,
|
||||
})
|
||||
)
|
||||
expect(mockExecuteJobInline).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should enqueue preassigned correlation metadata for schedules', async () => {
|
||||
@@ -277,25 +255,23 @@ describe('Scheduled Workflow Execution API Route', () => {
|
||||
const response = await GET(createMockRequest())
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
|
||||
expect(mockEnqueue).toHaveBeenCalledWith(
|
||||
'schedule-execution',
|
||||
expect.objectContaining({
|
||||
id: 'schedule-execution-1',
|
||||
workspaceId: 'workspace-1',
|
||||
lane: 'runtime',
|
||||
queueName: 'schedule-execution',
|
||||
bullmqJobName: 'schedule-execution',
|
||||
metadata: {
|
||||
scheduleId: 'schedule-1',
|
||||
workflowId: 'workflow-1',
|
||||
executionId: 'schedule-execution-1',
|
||||
}),
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
workflowId: 'workflow-1',
|
||||
correlation: {
|
||||
correlation: expect.objectContaining({
|
||||
executionId: 'schedule-execution-1',
|
||||
requestId: 'test-request-id',
|
||||
source: 'schedule',
|
||||
workflowId: 'workflow-1',
|
||||
scheduleId: 'schedule-1',
|
||||
triggerType: 'schedule',
|
||||
scheduledFor: '2025-01-01T00:00:00.000Z',
|
||||
},
|
||||
},
|
||||
}),
|
||||
}),
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
@@ -5,9 +5,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
|
||||
import {
|
||||
executeJobInline,
|
||||
executeScheduleJob,
|
||||
@@ -121,38 +119,13 @@ export async function GET(request: NextRequest) {
|
||||
: null
|
||||
const resolvedWorkspaceId = resolvedWorkflow?.workspaceId
|
||||
|
||||
let jobId: string
|
||||
if (isBullMQEnabled()) {
|
||||
if (!resolvedWorkspaceId) {
|
||||
throw new Error(
|
||||
`Missing workspace for scheduled workflow ${schedule.workflowId}; refusing to bypass workspace admission`
|
||||
)
|
||||
}
|
||||
|
||||
jobId = await enqueueWorkspaceDispatch({
|
||||
id: executionId,
|
||||
workspaceId: resolvedWorkspaceId,
|
||||
lane: 'runtime',
|
||||
queueName: 'schedule-execution',
|
||||
bullmqJobName: 'schedule-execution',
|
||||
bullmqPayload: createBullMQJobData(payload, {
|
||||
workflowId: schedule.workflowId ?? undefined,
|
||||
correlation,
|
||||
}),
|
||||
metadata: {
|
||||
workflowId: schedule.workflowId ?? undefined,
|
||||
correlation,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
jobId = await jobQueue.enqueue('schedule-execution', payload, {
|
||||
metadata: {
|
||||
workflowId: schedule.workflowId ?? undefined,
|
||||
workspaceId: resolvedWorkspaceId ?? undefined,
|
||||
correlation,
|
||||
},
|
||||
})
|
||||
}
|
||||
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
|
||||
metadata: {
|
||||
workflowId: schedule.workflowId ?? undefined,
|
||||
workspaceId: resolvedWorkspaceId ?? undefined,
|
||||
correlation,
|
||||
},
|
||||
})
|
||||
logger.info(
|
||||
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
|
||||
)
|
||||
@@ -204,7 +177,7 @@ export async function GET(request: NextRequest) {
|
||||
}
|
||||
})
|
||||
|
||||
// Mothership jobs use BullMQ when available, otherwise direct inline execution.
|
||||
// Mothership jobs execute inline directly.
|
||||
const jobPromises = dueJobs.map(async (job) => {
|
||||
const queueTime = job.lastQueuedAt ?? queuedAt
|
||||
const payload = {
|
||||
@@ -215,24 +188,7 @@ export async function GET(request: NextRequest) {
|
||||
}
|
||||
|
||||
try {
|
||||
if (isBullMQEnabled()) {
|
||||
if (!job.sourceWorkspaceId || !job.sourceUserId) {
|
||||
throw new Error(`Mothership job ${job.id} is missing workspace/user ownership`)
|
||||
}
|
||||
|
||||
await enqueueWorkspaceDispatch({
|
||||
workspaceId: job.sourceWorkspaceId!,
|
||||
lane: 'runtime',
|
||||
queueName: 'mothership-job-execution',
|
||||
bullmqJobName: 'mothership-job-execution',
|
||||
bullmqPayload: createBullMQJobData(payload),
|
||||
metadata: {
|
||||
userId: job.sourceUserId,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
await executeJobInline(payload)
|
||||
}
|
||||
await executeJobInline(payload)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Job execution failed for ${job.id}`, {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
|
||||
@@ -4,7 +4,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { deleteSkill, listSkills, upsertSkills } from '@/lib/workflows/skills/operations'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
@@ -24,7 +23,6 @@ const SkillSchema = z.object({
|
||||
})
|
||||
),
|
||||
workspaceId: z.string().optional(),
|
||||
source: z.enum(['settings', 'tool_input']).optional(),
|
||||
})
|
||||
|
||||
/** GET - Fetch all skills for a workspace */
|
||||
@@ -77,7 +75,7 @@ export async function POST(req: NextRequest) {
|
||||
const body = await req.json()
|
||||
|
||||
try {
|
||||
const { skills, workspaceId, source } = SkillSchema.parse(body)
|
||||
const { skills, workspaceId } = SkillSchema.parse(body)
|
||||
|
||||
if (!workspaceId) {
|
||||
logger.warn(`[${requestId}] Missing workspaceId in request body`)
|
||||
@@ -109,12 +107,6 @@ export async function POST(req: NextRequest) {
|
||||
resourceName: skill.name,
|
||||
description: `Created/updated skill "${skill.name}"`,
|
||||
})
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'skill_created',
|
||||
{ skill_id: skill.id, skill_name: skill.name, workspace_id: workspaceId, source },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
}
|
||||
|
||||
return NextResponse.json({ success: true, data: resultSkills })
|
||||
@@ -145,9 +137,6 @@ export async function DELETE(request: NextRequest) {
|
||||
const searchParams = request.nextUrl.searchParams
|
||||
const skillId = searchParams.get('id')
|
||||
const workspaceId = searchParams.get('workspaceId')
|
||||
const sourceParam = searchParams.get('source')
|
||||
const source =
|
||||
sourceParam === 'settings' || sourceParam === 'tool_input' ? sourceParam : undefined
|
||||
|
||||
try {
|
||||
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
|
||||
@@ -191,13 +180,6 @@ export async function DELETE(request: NextRequest) {
|
||||
description: `Deleted skill`,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'skill_deleted',
|
||||
{ skill_id: skillId, workspace_id: workspaceId, source },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
logger.info(`[${requestId}] Deleted skill: ${skillId}`)
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import {
|
||||
deleteTable,
|
||||
NAME_PATTERN,
|
||||
@@ -184,13 +183,6 @@ export async function DELETE(request: NextRequest, { params }: TableRouteParams)
|
||||
|
||||
await deleteTable(tableId, requestId)
|
||||
|
||||
captureServerEvent(
|
||||
authResult.userId,
|
||||
'table_deleted',
|
||||
{ table_id: tableId, workspace_id: table.workspaceId },
|
||||
{ groups: { workspace: table.workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
data: {
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import {
|
||||
createTable,
|
||||
getWorkspaceTableLimits,
|
||||
@@ -142,20 +141,6 @@ export async function POST(request: NextRequest) {
|
||||
requestId
|
||||
)
|
||||
|
||||
captureServerEvent(
|
||||
authResult.userId,
|
||||
'table_created',
|
||||
{
|
||||
table_id: table.id,
|
||||
workspace_id: params.workspaceId,
|
||||
column_count: params.schema.columns.length,
|
||||
},
|
||||
{
|
||||
groups: { workspace: params.workspaceId },
|
||||
setOnce: { first_table_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
data: {
|
||||
|
||||
@@ -3,7 +3,7 @@ import { templates } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/utils'
|
||||
import { checkInternalApiKey } from '@/lib/copilot/request/http'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { upsertCustomTools } from '@/lib/workflows/custom-tools/operations'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
@@ -35,7 +34,6 @@ const CustomToolSchema = z.object({
|
||||
})
|
||||
),
|
||||
workspaceId: z.string().optional(),
|
||||
source: z.enum(['settings', 'tool_input']).optional(),
|
||||
})
|
||||
|
||||
// GET - Fetch all custom tools for the workspace
|
||||
@@ -137,7 +135,7 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
try {
|
||||
// Validate the request body
|
||||
const { tools, workspaceId, source } = CustomToolSchema.parse(body)
|
||||
const { tools, workspaceId } = CustomToolSchema.parse(body)
|
||||
|
||||
if (!workspaceId) {
|
||||
logger.warn(`[${requestId}] Missing workspaceId in request body`)
|
||||
@@ -170,16 +168,6 @@ export async function POST(req: NextRequest) {
|
||||
})
|
||||
|
||||
for (const tool of resultTools) {
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'custom_tool_saved',
|
||||
{ tool_id: tool.id, workspace_id: workspaceId, tool_name: tool.title, source },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_custom_tool_saved_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
@@ -217,9 +205,6 @@ export async function DELETE(request: NextRequest) {
|
||||
const searchParams = request.nextUrl.searchParams
|
||||
const toolId = searchParams.get('id')
|
||||
const workspaceId = searchParams.get('workspaceId')
|
||||
const sourceParam = searchParams.get('source')
|
||||
const source =
|
||||
sourceParam === 'settings' || sourceParam === 'tool_input' ? sourceParam : undefined
|
||||
|
||||
if (!toolId) {
|
||||
logger.warn(`[${requestId}] Missing tool ID for deletion`)
|
||||
@@ -293,14 +278,6 @@ export async function DELETE(request: NextRequest) {
|
||||
// Delete the tool
|
||||
await db.delete(customTools).where(eq(customTools.id, toolId))
|
||||
|
||||
const toolWorkspaceId = tool.workspaceId ?? workspaceId ?? ''
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'custom_tool_deleted',
|
||||
{ tool_id: toolId, workspace_id: toolWorkspaceId, source },
|
||||
toolWorkspaceId ? { groups: { workspace: toolWorkspaceId } } : undefined
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: tool.workspaceId || undefined,
|
||||
actorId: userId,
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants'
|
||||
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
|
||||
import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
||||
import { authenticateV1Request } from '@/app/api/v1/auth'
|
||||
|
||||
@@ -83,15 +82,19 @@ export async function POST(req: NextRequest) {
|
||||
const chatId = parsed.chatId || crypto.randomUUID()
|
||||
|
||||
messageId = crypto.randomUUID()
|
||||
const reqLogger = logger.withMetadata({ messageId })
|
||||
reqLogger.info('Received headless copilot chat start request', {
|
||||
workflowId: resolved.workflowId,
|
||||
workflowName: parsed.workflowName,
|
||||
chatId,
|
||||
mode: transportMode,
|
||||
autoExecuteTools: parsed.autoExecuteTools,
|
||||
timeout: parsed.timeout,
|
||||
})
|
||||
logger.info(
|
||||
messageId
|
||||
? `Received headless copilot chat start request [messageId:${messageId}]`
|
||||
: 'Received headless copilot chat start request',
|
||||
{
|
||||
workflowId: resolved.workflowId,
|
||||
workflowName: parsed.workflowName,
|
||||
chatId,
|
||||
mode: transportMode,
|
||||
autoExecuteTools: parsed.autoExecuteTools,
|
||||
timeout: parsed.timeout,
|
||||
}
|
||||
)
|
||||
const requestPayload = {
|
||||
message: parsed.message,
|
||||
workflowId: resolved.workflowId,
|
||||
@@ -102,24 +105,10 @@ export async function POST(req: NextRequest) {
|
||||
chatId,
|
||||
}
|
||||
|
||||
const executionId = crypto.randomUUID()
|
||||
const runId = crypto.randomUUID()
|
||||
|
||||
await createRunSegment({
|
||||
id: runId,
|
||||
executionId,
|
||||
chatId,
|
||||
userId: auth.userId,
|
||||
workflowId: resolved.workflowId,
|
||||
streamId: messageId,
|
||||
}).catch(() => {})
|
||||
|
||||
const result = await orchestrateCopilotStream(requestPayload, {
|
||||
const result = await runCopilotLifecycle(requestPayload, {
|
||||
userId: auth.userId,
|
||||
workflowId: resolved.workflowId,
|
||||
chatId,
|
||||
executionId,
|
||||
runId,
|
||||
goRoute: '/api/mcp',
|
||||
autoExecuteTools: parsed.autoExecuteTools,
|
||||
timeout: parsed.timeout,
|
||||
@@ -141,9 +130,14 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
logger.withMetadata({ messageId }).error('Headless copilot request failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
logger.error(
|
||||
messageId
|
||||
? `Headless copilot request failed [messageId:${messageId}]`
|
||||
: 'Headless copilot request failed',
|
||||
{
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
}
|
||||
)
|
||||
return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { validateInteger } from '@/lib/core/security/input-validation'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
|
||||
@@ -275,19 +274,6 @@ export async function DELETE(
|
||||
request,
|
||||
})
|
||||
|
||||
const wsId = webhookData.workflow.workspaceId || undefined
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'webhook_trigger_deleted',
|
||||
{
|
||||
webhook_id: id,
|
||||
workflow_id: webhookData.workflow.id,
|
||||
provider: foundWebhook.provider || 'generic',
|
||||
workspace_id: wsId ?? '',
|
||||
},
|
||||
wsId ? { groups: { workspace: wsId } } : undefined
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true }, { status: 200 })
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Error deleting webhook`, {
|
||||
|
||||
@@ -9,7 +9,6 @@ import { getSession } from '@/lib/auth'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getProviderIdFromServiceId } from '@/lib/oauth'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
||||
import {
|
||||
cleanupExternalWebhook,
|
||||
@@ -764,19 +763,6 @@ export async function POST(request: NextRequest) {
|
||||
metadata: { provider, workflowId },
|
||||
request,
|
||||
})
|
||||
|
||||
const wsId = workflowRecord.workspaceId || undefined
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'webhook_trigger_created',
|
||||
{
|
||||
webhook_id: savedWebhook.id,
|
||||
workflow_id: workflowId,
|
||||
provider: provider || 'generic',
|
||||
workspace_id: wsId ?? '',
|
||||
},
|
||||
wsId ? { groups: { workspace: wsId } } : undefined
|
||||
)
|
||||
}
|
||||
|
||||
const status = targetWebhookId ? 200 : 201
|
||||
|
||||
@@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { DispatchQueueFullError } from '@/lib/core/workspace-dispatch'
|
||||
import {
|
||||
checkWebhookPreprocessing,
|
||||
findAllWebhooksForPath,
|
||||
@@ -156,29 +155,14 @@ async function handleWebhookPost(
|
||||
if (shouldSkipWebhookEvent(foundWebhook, body, requestId)) {
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
|
||||
requestId,
|
||||
path,
|
||||
actorUserId: preprocessResult.actorUserId,
|
||||
executionId: preprocessResult.executionId,
|
||||
correlation: preprocessResult.correlation,
|
||||
})
|
||||
responses.push(response)
|
||||
} catch (error) {
|
||||
if (error instanceof DispatchQueueFullError) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Service temporarily at capacity',
|
||||
message: error.message,
|
||||
retryAfterSeconds: 10,
|
||||
},
|
||||
{ status: 503, headers: { 'Retry-After': '10' } }
|
||||
)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
|
||||
requestId,
|
||||
path,
|
||||
actorUserId: preprocessResult.actorUserId,
|
||||
executionId: preprocessResult.executionId,
|
||||
correlation: preprocessResult.correlation,
|
||||
})
|
||||
responses.push(response)
|
||||
}
|
||||
|
||||
if (responses.length === 0) {
|
||||
|
||||
@@ -3,7 +3,6 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { performFullDeploy, performFullUndeploy } from '@/lib/workflows/orchestration'
|
||||
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
|
||||
import {
|
||||
@@ -97,16 +96,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
|
||||
logger.info(`[${requestId}] Workflow deployed successfully: ${id}`)
|
||||
|
||||
captureServerEvent(
|
||||
actorUserId,
|
||||
'workflow_deployed',
|
||||
{ workflow_id: id, workspace_id: workflowData!.workspaceId ?? '' },
|
||||
{
|
||||
groups: workflowData!.workspaceId ? { workspace: workflowData!.workspaceId } : undefined,
|
||||
setOnce: { first_workflow_deployed_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
const responseApiKeyInfo = workflowData!.workspaceId
|
||||
? 'Workspace API keys'
|
||||
: 'Personal API keys'
|
||||
@@ -129,11 +118,7 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const {
|
||||
error,
|
||||
session,
|
||||
workflow: workflowData,
|
||||
} = await validateWorkflowPermissions(id, requestId, 'admin')
|
||||
const { error, session } = await validateWorkflowPermissions(id, requestId, 'admin')
|
||||
if (error) {
|
||||
return createErrorResponse(error.message, error.status)
|
||||
}
|
||||
@@ -163,14 +148,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
|
||||
logger.info(`[${requestId}] Updated isPublicApi for workflow ${id} to ${isPublicApi}`)
|
||||
|
||||
const wsId = workflowData?.workspaceId
|
||||
captureServerEvent(
|
||||
session!.user.id,
|
||||
'workflow_public_api_toggled',
|
||||
{ workflow_id: id, workspace_id: wsId ?? '', is_public: isPublicApi },
|
||||
wsId ? { groups: { workspace: wsId } } : undefined
|
||||
)
|
||||
|
||||
return createSuccessResponse({ isPublicApi })
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : 'Failed to update deployment settings'
|
||||
@@ -187,11 +164,7 @@ export async function DELETE(
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const {
|
||||
error,
|
||||
session,
|
||||
workflow: workflowData,
|
||||
} = await validateWorkflowPermissions(id, requestId, 'admin')
|
||||
const { error, session } = await validateWorkflowPermissions(id, requestId, 'admin')
|
||||
if (error) {
|
||||
return createErrorResponse(error.message, error.status)
|
||||
}
|
||||
@@ -206,14 +179,6 @@ export async function DELETE(
|
||||
return createErrorResponse(result.error || 'Failed to undeploy workflow', 500)
|
||||
}
|
||||
|
||||
const wsId = workflowData?.workspaceId
|
||||
captureServerEvent(
|
||||
session!.user.id,
|
||||
'workflow_undeployed',
|
||||
{ workflow_id: id, workspace_id: wsId ?? '' },
|
||||
wsId ? { groups: { workspace: wsId } } : undefined
|
||||
)
|
||||
|
||||
return createSuccessResponse({
|
||||
isDeployed: false,
|
||||
deployedAt: null,
|
||||
|
||||
@@ -5,7 +5,6 @@ import type { NextRequest } from 'next/server'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
|
||||
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
|
||||
@@ -79,6 +78,7 @@ export async function POST(
|
||||
loops: deployedState.loops || {},
|
||||
parallels: deployedState.parallels || {},
|
||||
lastSaved: Date.now(),
|
||||
deploymentStatuses: deployedState.deploymentStatuses || {},
|
||||
})
|
||||
|
||||
if (!saveResult.success) {
|
||||
@@ -104,19 +104,6 @@ export async function POST(
|
||||
logger.error('Error sending workflow reverted event to socket server', e)
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
session!.user.id,
|
||||
'workflow_deployment_reverted',
|
||||
{
|
||||
workflow_id: id,
|
||||
workspace_id: workflowRecord?.workspaceId ?? '',
|
||||
version,
|
||||
},
|
||||
workflowRecord?.workspaceId
|
||||
? { groups: { workspace: workflowRecord.workspaceId } }
|
||||
: undefined
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: workflowRecord?.workspaceId ?? null,
|
||||
actorId: session!.user.id,
|
||||
|
||||
@@ -4,7 +4,6 @@ import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { performActivateVersion } from '@/lib/workflows/orchestration'
|
||||
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
|
||||
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
|
||||
@@ -175,14 +174,6 @@ export async function PATCH(
|
||||
}
|
||||
}
|
||||
|
||||
const wsId = (workflowData as { workspaceId?: string } | null)?.workspaceId
|
||||
captureServerEvent(
|
||||
actorUserId,
|
||||
'deployment_version_activated',
|
||||
{ workflow_id: id, workspace_id: wsId ?? '', version: versionNum },
|
||||
wsId ? { groups: { workspace: wsId } } : undefined
|
||||
)
|
||||
|
||||
return createSuccessResponse({
|
||||
success: true,
|
||||
deployedAt: activateResult.deployedAt,
|
||||
|
||||
@@ -5,7 +5,6 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { duplicateWorkflow } from '@/lib/workflows/persistence/duplicate'
|
||||
|
||||
const logger = createLogger('WorkflowDuplicateAPI')
|
||||
@@ -61,17 +60,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
// Telemetry should not fail the operation
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'workflow_duplicated',
|
||||
{
|
||||
source_workflow_id: sourceWorkflowId,
|
||||
new_workflow_id: result.id,
|
||||
workspace_id: workspaceId ?? '',
|
||||
},
|
||||
workspaceId ? { groups: { workspace: workspaceId } } : undefined
|
||||
)
|
||||
|
||||
const elapsed = Date.now() - startTime
|
||||
logger.info(
|
||||
`[${requestId}] Successfully duplicated workflow ${sourceWorkflowId} to ${result.id} in ${elapsed}ms`
|
||||
|
||||
@@ -10,13 +10,11 @@ const {
|
||||
mockAuthorizeWorkflowByWorkspacePermission,
|
||||
mockPreprocessExecution,
|
||||
mockEnqueue,
|
||||
mockEnqueueWorkspaceDispatch,
|
||||
} = vi.hoisted(() => ({
|
||||
mockCheckHybridAuth: vi.fn(),
|
||||
mockAuthorizeWorkflowByWorkspacePermission: vi.fn(),
|
||||
mockPreprocessExecution: vi.fn(),
|
||||
mockEnqueue: vi.fn().mockResolvedValue('job-123'),
|
||||
mockEnqueueWorkspaceDispatch: vi.fn().mockResolvedValue('job-123'),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/auth/hybrid', () => ({
|
||||
@@ -47,16 +45,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
|
||||
markJobFailed: vi.fn(),
|
||||
}),
|
||||
shouldExecuteInline: vi.fn().mockReturnValue(false),
|
||||
shouldUseBullMQ: vi.fn().mockReturnValue(true),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/bullmq', () => ({
|
||||
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/workspace-dispatch', () => ({
|
||||
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
|
||||
waitForDispatchJob: vi.fn(),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/core/utils/request', () => ({
|
||||
@@ -147,24 +135,20 @@ describe('workflow execute async route', () => {
|
||||
expect(response.status).toBe(202)
|
||||
expect(body.executionId).toBe('execution-123')
|
||||
expect(body.jobId).toBe('job-123')
|
||||
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
|
||||
expect(mockEnqueue).toHaveBeenCalledWith(
|
||||
'workflow-execution',
|
||||
expect.objectContaining({
|
||||
id: 'execution-123',
|
||||
workflowId: 'workflow-1',
|
||||
userId: 'actor-1',
|
||||
workspaceId: 'workspace-1',
|
||||
lane: 'runtime',
|
||||
queueName: 'workflow-execution',
|
||||
bullmqJobName: 'workflow-execution',
|
||||
metadata: {
|
||||
executionId: 'execution-123',
|
||||
}),
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
workflowId: 'workflow-1',
|
||||
userId: 'actor-1',
|
||||
correlation: {
|
||||
executionId: 'execution-123',
|
||||
requestId: 'req-12345678',
|
||||
source: 'workflow',
|
||||
workflowId: 'workflow-1',
|
||||
triggerType: 'manual',
|
||||
},
|
||||
},
|
||||
workspaceId: 'workspace-1',
|
||||
}),
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
@@ -4,8 +4,7 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid'
|
||||
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
|
||||
import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs'
|
||||
import { createBullMQJobData } from '@/lib/core/bullmq'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getTimeoutErrorMessage,
|
||||
@@ -14,13 +13,6 @@ import {
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import {
|
||||
DispatchQueueFullError,
|
||||
enqueueWorkspaceDispatch,
|
||||
type WorkspaceDispatchLane,
|
||||
waitForDispatchJob,
|
||||
} from '@/lib/core/workspace-dispatch'
|
||||
import { createBufferedExecutionStream } from '@/lib/execution/buffered-stream'
|
||||
import {
|
||||
buildNextCallChain,
|
||||
parseCallChain,
|
||||
@@ -42,11 +34,6 @@ import {
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
|
||||
import {
|
||||
DIRECT_WORKFLOW_JOB_NAME,
|
||||
type QueuedWorkflowExecutionPayload,
|
||||
type QueuedWorkflowExecutionResult,
|
||||
} from '@/lib/workflows/executor/queued-workflow-execution'
|
||||
import {
|
||||
loadDeployedWorkflowState,
|
||||
loadWorkflowFromNormalizedTables,
|
||||
@@ -118,8 +105,6 @@ const ExecuteWorkflowSchema = z.object({
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
const INLINE_TRIGGER_TYPES = new Set<CoreTriggerType>(['manual', 'workflow'])
|
||||
|
||||
function resolveOutputIds(
|
||||
selectedOutputs: string[] | undefined,
|
||||
blocks: Record<string, any>
|
||||
@@ -216,39 +201,19 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
}
|
||||
|
||||
try {
|
||||
const useBullMQ = shouldUseBullMQ()
|
||||
const jobQueue = useBullMQ ? null : await getJobQueue()
|
||||
const jobId = useBullMQ
|
||||
? await enqueueWorkspaceDispatch({
|
||||
id: executionId,
|
||||
workspaceId,
|
||||
lane: 'runtime',
|
||||
queueName: 'workflow-execution',
|
||||
bullmqJobName: 'workflow-execution',
|
||||
bullmqPayload: createBullMQJobData(payload, {
|
||||
workflowId,
|
||||
userId,
|
||||
correlation,
|
||||
}),
|
||||
metadata: {
|
||||
workflowId,
|
||||
userId,
|
||||
correlation,
|
||||
},
|
||||
})
|
||||
: await jobQueue!.enqueue('workflow-execution', payload, {
|
||||
metadata: { workflowId, workspaceId, userId, correlation },
|
||||
})
|
||||
const jobQueue = await getJobQueue()
|
||||
const jobId = await jobQueue.enqueue('workflow-execution', payload, {
|
||||
metadata: { workflowId, workspaceId, userId, correlation },
|
||||
})
|
||||
|
||||
asyncLogger.info('Queued async workflow execution', { jobId })
|
||||
|
||||
if (shouldExecuteInline() && jobQueue) {
|
||||
const inlineJobQueue = jobQueue
|
||||
if (shouldExecuteInline()) {
|
||||
void (async () => {
|
||||
try {
|
||||
await inlineJobQueue.startJob(jobId)
|
||||
await jobQueue.startJob(jobId)
|
||||
const output = await executeWorkflowJob(payload)
|
||||
await inlineJobQueue.completeJob(jobId, output)
|
||||
await jobQueue.completeJob(jobId, output)
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
asyncLogger.error('Async workflow execution failed', {
|
||||
@@ -256,7 +221,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
error: errorMessage,
|
||||
})
|
||||
try {
|
||||
await inlineJobQueue.markJobFailed(jobId, errorMessage)
|
||||
await jobQueue.markJobFailed(jobId, errorMessage)
|
||||
} catch (markFailedError) {
|
||||
asyncLogger.error('Failed to mark job as failed', {
|
||||
jobId,
|
||||
@@ -282,17 +247,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
{ status: 202 }
|
||||
)
|
||||
} catch (error: any) {
|
||||
if (error instanceof DispatchQueueFullError) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Service temporarily at capacity',
|
||||
message: error.message,
|
||||
retryAfterSeconds: 10,
|
||||
},
|
||||
{ status: 503, headers: { 'Retry-After': '10' } }
|
||||
)
|
||||
}
|
||||
|
||||
asyncLogger.error('Failed to queue async execution', error)
|
||||
return NextResponse.json(
|
||||
{ error: `Failed to queue async execution: ${error.message}` },
|
||||
@@ -301,31 +255,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
}
|
||||
}
|
||||
|
||||
async function enqueueDirectWorkflowExecution(
|
||||
payload: QueuedWorkflowExecutionPayload,
|
||||
priority: number,
|
||||
lane: WorkspaceDispatchLane
|
||||
) {
|
||||
return enqueueWorkspaceDispatch({
|
||||
id: payload.metadata.executionId,
|
||||
workspaceId: payload.metadata.workspaceId,
|
||||
lane,
|
||||
queueName: 'workflow-execution',
|
||||
bullmqJobName: DIRECT_WORKFLOW_JOB_NAME,
|
||||
bullmqPayload: createBullMQJobData(payload, {
|
||||
workflowId: payload.metadata.workflowId,
|
||||
userId: payload.metadata.userId,
|
||||
correlation: payload.metadata.correlation,
|
||||
}),
|
||||
metadata: {
|
||||
workflowId: payload.metadata.workflowId,
|
||||
userId: payload.metadata.userId,
|
||||
correlation: payload.metadata.correlation,
|
||||
},
|
||||
priority,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /api/workflows/[id]/execute
|
||||
*
|
||||
@@ -793,92 +722,6 @@ async function handleExecutePost(
|
||||
|
||||
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
|
||||
|
||||
if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
|
||||
try {
|
||||
const dispatchJobId = await enqueueDirectWorkflowExecution(
|
||||
{
|
||||
workflow,
|
||||
metadata,
|
||||
input: processedInput,
|
||||
variables: executionVariables,
|
||||
selectedOutputs,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
timeoutMs: preprocessResult.executionTimeout?.sync,
|
||||
runFromBlock: resolvedRunFromBlock,
|
||||
},
|
||||
5,
|
||||
'interactive'
|
||||
)
|
||||
|
||||
const resultRecord = await waitForDispatchJob(
|
||||
dispatchJobId,
|
||||
(preprocessResult.executionTimeout?.sync ?? 300000) + 30000
|
||||
)
|
||||
|
||||
if (resultRecord.status === 'failed') {
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
executionId,
|
||||
error: resultRecord.error ?? 'Workflow execution failed',
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
|
||||
const result = resultRecord.output as QueuedWorkflowExecutionResult
|
||||
|
||||
const resultForResponseBlock = {
|
||||
success: result.success,
|
||||
logs: result.logs,
|
||||
output: result.output,
|
||||
}
|
||||
|
||||
if (
|
||||
auth.authType !== AuthType.INTERNAL_JWT &&
|
||||
workflowHasResponseBlock(resultForResponseBlock)
|
||||
) {
|
||||
return createHttpResponseFromBlock(resultForResponseBlock)
|
||||
}
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: result.success,
|
||||
executionId,
|
||||
output: result.output,
|
||||
error: result.error,
|
||||
metadata: result.metadata,
|
||||
},
|
||||
{ status: result.statusCode ?? 200 }
|
||||
)
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof DispatchQueueFullError) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Service temporarily at capacity',
|
||||
message: error.message,
|
||||
retryAfterSeconds: 10,
|
||||
},
|
||||
{ status: 503, headers: { 'Retry-After': '10' } }
|
||||
)
|
||||
}
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
|
||||
reqLogger.error(`Queued non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const timeoutController = createTimeoutAbortController(
|
||||
preprocessResult.executionTimeout?.sync
|
||||
)
|
||||
@@ -993,53 +836,6 @@ async function handleExecutePost(
|
||||
}
|
||||
|
||||
if (shouldUseDraftState) {
|
||||
const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)
|
||||
if (shouldDispatchViaQueue) {
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
executionId,
|
||||
workflowId,
|
||||
workspaceId,
|
||||
userId: actorUserId,
|
||||
sessionUserId: isClientSession ? userId : undefined,
|
||||
workflowUserId: workflow.userId,
|
||||
triggerType,
|
||||
useDraftState: shouldUseDraftState,
|
||||
startTime: new Date().toISOString(),
|
||||
isClientSession,
|
||||
enforceCredentialAccess: useAuthenticatedUserAsActor,
|
||||
workflowStateOverride: effectiveWorkflowStateOverride,
|
||||
callChain,
|
||||
}
|
||||
|
||||
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
|
||||
|
||||
await enqueueDirectWorkflowExecution(
|
||||
{
|
||||
workflow,
|
||||
metadata,
|
||||
input: processedInput,
|
||||
variables: executionVariables,
|
||||
selectedOutputs,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
timeoutMs: preprocessResult.executionTimeout?.sync,
|
||||
runFromBlock: resolvedRunFromBlock,
|
||||
streamEvents: true,
|
||||
},
|
||||
1,
|
||||
'interactive'
|
||||
)
|
||||
|
||||
return new NextResponse(createBufferedExecutionStream(executionId), {
|
||||
headers: {
|
||||
...SSE_HEADERS,
|
||||
'X-Execution-Id': executionId,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
reqLogger.info('Using SSE console log streaming (manual execution)')
|
||||
} else {
|
||||
reqLogger.info('Using streaming API response')
|
||||
@@ -1505,17 +1301,6 @@ async function handleExecutePost(
|
||||
},
|
||||
})
|
||||
} catch (error: any) {
|
||||
if (error instanceof DispatchQueueFullError) {
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Service temporarily at capacity',
|
||||
message: error.message,
|
||||
retryAfterSeconds: 10,
|
||||
},
|
||||
{ status: 503, headers: { 'Retry-After': '10' } }
|
||||
)
|
||||
}
|
||||
|
||||
reqLogger.error('Failed to start workflow execution:', error)
|
||||
return NextResponse.json(
|
||||
{ error: error.message || 'Failed to start workflow execution' },
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { abortManualExecution } from '@/lib/execution/manual-cancellation'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
|
||||
const logger = createLogger('CancelExecutionAPI')
|
||||
@@ -61,16 +60,6 @@ export async function POST(
|
||||
})
|
||||
}
|
||||
|
||||
if (cancellation.durablyRecorded || locallyAborted) {
|
||||
const workspaceId = workflowAuthorization.workflow?.workspaceId
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'workflow_execution_cancelled',
|
||||
{ workflow_id: workflowId, workspace_id: workspaceId ?? '' },
|
||||
workspaceId ? { groups: { workspace: workspaceId } } : undefined
|
||||
)
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: cancellation.durablyRecorded || locallyAborted,
|
||||
executionId,
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { restoreWorkflow } from '@/lib/workflows/lifecycle'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
@@ -59,13 +58,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
request,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
auth.userId,
|
||||
'workflow_restored',
|
||||
{ workflow_id: workflowId, workspace_id: workflowData.workspaceId ?? '' },
|
||||
workflowData.workspaceId ? { groups: { workspace: workflowData.workspaceId } } : undefined
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error restoring workflow ${workflowId}`, error)
|
||||
|
||||
@@ -6,7 +6,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { AuthType, checkHybridAuth, checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { performDeleteWorkflow } from '@/lib/workflows/orchestration'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { authorizeWorkflowByWorkspacePermission, getWorkflowById } from '@/lib/workflows/utils'
|
||||
@@ -89,6 +88,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
|
||||
const finalWorkflowData = {
|
||||
...workflowData,
|
||||
state: {
|
||||
deploymentStatuses: {},
|
||||
blocks: normalizedData.blocks,
|
||||
edges: normalizedData.edges,
|
||||
loops: normalizedData.loops,
|
||||
@@ -114,6 +114,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
|
||||
const emptyWorkflowData = {
|
||||
...workflowData,
|
||||
state: {
|
||||
deploymentStatuses: {},
|
||||
blocks: {},
|
||||
edges: [],
|
||||
loops: {},
|
||||
@@ -224,13 +225,6 @@ export async function DELETE(
|
||||
return NextResponse.json({ error: result.error }, { status })
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'workflow_deleted',
|
||||
{ workflow_id: workflowId, workspace_id: workflowData.workspaceId ?? '' },
|
||||
workflowData.workspaceId ? { groups: { workspace: workflowData.workspaceId } } : undefined
|
||||
)
|
||||
|
||||
const elapsed = Date.now() - startTime
|
||||
logger.info(`[${requestId}] Successfully archived workflow ${workflowId} in ${elapsed}ms`)
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import type { Variable } from '@/stores/variables/types'
|
||||
import type { Variable } from '@/stores/panel/variables/types'
|
||||
|
||||
const logger = createLogger('WorkflowVariablesAPI')
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getNextWorkflowColor } from '@/lib/workflows/colors'
|
||||
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
|
||||
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
@@ -275,16 +274,6 @@ export async function POST(req: NextRequest) {
|
||||
|
||||
logger.info(`[${requestId}] Successfully created workflow ${workflowId} with default blocks`)
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'workflow_created',
|
||||
{ workflow_id: workflowId, workspace_id: workspaceId ?? '', name },
|
||||
{
|
||||
groups: workspaceId ? { workspace: workspaceId } : undefined,
|
||||
setOnce: { first_workflow_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WorkspaceApiKeyAPI')
|
||||
@@ -146,13 +145,6 @@ export async function DELETE(
|
||||
|
||||
const deletedKey = deletedRows[0]
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'api_key_revoked',
|
||||
{ workspace_id: workspaceId, key_name: deletedKey.name },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
|
||||
@@ -10,14 +10,12 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WorkspaceApiKeysAPI')
|
||||
|
||||
const CreateKeySchema = z.object({
|
||||
name: z.string().trim().min(1, 'Name is required'),
|
||||
source: z.enum(['settings', 'deploy_modal']).optional(),
|
||||
})
|
||||
|
||||
const DeleteKeysSchema = z.object({
|
||||
@@ -103,7 +101,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
}
|
||||
|
||||
const body = await request.json()
|
||||
const { name, source } = CreateKeySchema.parse(body)
|
||||
const { name } = CreateKeySchema.parse(body)
|
||||
|
||||
const existingKey = await db
|
||||
.select()
|
||||
@@ -160,16 +158,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
// Telemetry should not fail the operation
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'api_key_created',
|
||||
{ workspace_id: workspaceId, key_name: name, source },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_api_key_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(`[${requestId}] Created workspace API key: ${name} in workspace ${workspaceId}`)
|
||||
|
||||
recordAudit({
|
||||
|
||||
@@ -9,7 +9,6 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getUserEntityPermissions, getWorkspaceById } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WorkspaceBYOKKeysAPI')
|
||||
@@ -202,16 +201,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
|
||||
logger.info(`[${requestId}] Created BYOK key for ${providerId} in workspace ${workspaceId}`)
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'byok_key_added',
|
||||
{ workspace_id: workspaceId, provider_id: providerId },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_byok_key_added_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
@@ -283,13 +272,6 @@ export async function DELETE(
|
||||
|
||||
logger.info(`[${requestId}] Deleted BYOK key for ${providerId} from workspace ${workspaceId}`)
|
||||
|
||||
captureServerEvent(
|
||||
userId,
|
||||
'byok_key_removed',
|
||||
{ workspace_id: workspaceId, provider_id: providerId },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: userId,
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import {
|
||||
FileConflictError,
|
||||
listWorkspaceFiles,
|
||||
@@ -117,13 +116,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
|
||||
logger.info(`[${requestId}] Uploaded workspace file: ${fileName}`)
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'file_uploaded',
|
||||
{ workspace_id: workspaceId, file_type: rawFile.type || 'application/octet-stream' },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: session.user.id,
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { encryptSecret } from '@/lib/core/security/encryption'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
import { MAX_EMAIL_RECIPIENTS, MAX_WORKFLOW_IDS } from '../constants'
|
||||
|
||||
@@ -343,17 +342,6 @@ export async function DELETE(request: NextRequest, { params }: RouteParams) {
|
||||
request,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'notification_channel_deleted',
|
||||
{
|
||||
notification_id: notificationId,
|
||||
notification_type: deletedSubscription.notificationType,
|
||||
workspace_id: workspaceId,
|
||||
},
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
logger.error('Error deleting notification', { error })
|
||||
|
||||
@@ -8,7 +8,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { encryptSecret } from '@/lib/core/security/encryption'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
import { MAX_EMAIL_RECIPIENTS, MAX_NOTIFICATIONS_PER_TYPE, MAX_WORKFLOW_IDS } from './constants'
|
||||
|
||||
@@ -257,17 +256,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
type: data.notificationType,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'notification_channel_created',
|
||||
{
|
||||
workspace_id: workspaceId,
|
||||
notification_type: data.notificationType,
|
||||
alert_rule: data.alertConfig?.rule ?? null,
|
||||
},
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: session.user.id,
|
||||
|
||||
@@ -8,7 +8,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { syncWorkspaceEnvCredentials } from '@/lib/credentials/environment'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import {
|
||||
getUsersWithPermissions,
|
||||
hasWorkspaceAdminAccess,
|
||||
@@ -189,13 +188,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
const updatedUsers = await getUsersWithPermissions(workspaceId)
|
||||
|
||||
for (const update of body.updates) {
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'workspace_member_role_changed',
|
||||
{ workspace_id: workspaceId, new_role: update.permissions },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: session.user.id,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generatePptxFromCode } from '@/lib/execution/pptx-vm'
|
||||
import { generatePptxFromCode } from '@/lib/execution/doc-vm'
|
||||
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
@@ -5,7 +5,6 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { archiveWorkspace } from '@/lib/workspaces/lifecycle'
|
||||
|
||||
const logger = createLogger('WorkspaceByIdAPI')
|
||||
@@ -293,13 +292,6 @@ export async function DELETE(
|
||||
request,
|
||||
})
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'workspace_deleted',
|
||||
{ workspace_id: workspaceId, workflow_count: workflowIds.length },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
} catch (error) {
|
||||
logger.error(`Error deleting workspace ${workspaceId}:`, error)
|
||||
|
||||
@@ -19,7 +19,6 @@ import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { sendEmail } from '@/lib/messaging/email/mailer'
|
||||
import { getFromEmailAddress } from '@/lib/messaging/email/utils'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { getWorkspaceById } from '@/lib/workspaces/permissions/utils'
|
||||
import {
|
||||
InvitationsNotAllowedError,
|
||||
@@ -215,16 +214,6 @@ export async function POST(req: NextRequest) {
|
||||
// Telemetry should not fail the operation
|
||||
}
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'workspace_member_invited',
|
||||
{ workspace_id: workspaceId, invitee_role: permission },
|
||||
{
|
||||
groups: { workspace: workspaceId },
|
||||
setOnce: { first_invitation_sent_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
await sendInvitationEmail({
|
||||
to: email,
|
||||
inviterName: session.user.name || session.user.email || 'A user',
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { revokeWorkspaceCredentialMemberships } from '@/lib/credentials/access'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { hasWorkspaceAdminAccess } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WorkspaceMemberAPI')
|
||||
@@ -106,13 +105,6 @@ export async function DELETE(req: NextRequest, { params }: { params: Promise<{ i
|
||||
|
||||
await revokeWorkspaceCredentialMemberships(workspaceId, userId)
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'workspace_member_removed',
|
||||
{ workspace_id: workspaceId, is_self_removal: isSelf },
|
||||
{ groups: { workspace: workspaceId } }
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId,
|
||||
actorId: session.user.id,
|
||||
|
||||
@@ -7,7 +7,6 @@ import { z } from 'zod'
|
||||
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { captureServerEvent } from '@/lib/posthog/server'
|
||||
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
|
||||
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { getRandomWorkspaceColor } from '@/lib/workspaces/colors'
|
||||
@@ -97,16 +96,6 @@ export async function POST(req: Request) {
|
||||
|
||||
const newWorkspace = await createWorkspace(session.user.id, name, skipDefaultWorkflow, color)
|
||||
|
||||
captureServerEvent(
|
||||
session.user.id,
|
||||
'workspace_created',
|
||||
{ workspace_id: newWorkspace.id, name: newWorkspace.name },
|
||||
{
|
||||
groups: { workspace: newWorkspace.id },
|
||||
setOnce: { first_workspace_created_at: new Date().toISOString() },
|
||||
}
|
||||
)
|
||||
|
||||
recordAudit({
|
||||
workspaceId: newWorkspace.id,
|
||||
actorId: session.user.id,
|
||||
|
||||
@@ -90,7 +90,6 @@ export default function RootLayout({ children }: { children: React.ReactNode })
|
||||
}
|
||||
|
||||
// Sidebar width
|
||||
var defaultSidebarWidth = '248px';
|
||||
try {
|
||||
var stored = localStorage.getItem('sidebar-state');
|
||||
if (stored) {
|
||||
@@ -109,15 +108,11 @@ export default function RootLayout({ children }: { children: React.ReactNode })
|
||||
document.documentElement.style.setProperty('--sidebar-width', width + 'px');
|
||||
} else if (width > maxSidebarWidth) {
|
||||
document.documentElement.style.setProperty('--sidebar-width', maxSidebarWidth + 'px');
|
||||
} else {
|
||||
document.documentElement.style.setProperty('--sidebar-width', defaultSidebarWidth);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
document.documentElement.style.setProperty('--sidebar-width', defaultSidebarWidth);
|
||||
}
|
||||
} catch (e) {
|
||||
document.documentElement.style.setProperty('--sidebar-width', defaultSidebarWidth);
|
||||
// Fallback handled by CSS defaults
|
||||
}
|
||||
|
||||
// Panel width and active tab
|
||||
|
||||
@@ -108,6 +108,8 @@ function normalizeWorkflowState(input?: any): WorkflowState | null {
|
||||
lastUpdate: input.lastUpdate,
|
||||
metadata: input.metadata,
|
||||
variables: input.variables,
|
||||
deploymentStatuses: input.deploymentStatuses,
|
||||
needsRedeployment: input.needsRedeployment,
|
||||
dragStartPosition: input.dragStartPosition ?? null,
|
||||
}
|
||||
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user