Compare commits

..

11 Commits

Author SHA1 Message Date
Zamil Majdy
0058cd3ba6 fix(frontend): auto-poll for long-running tool completion (#11866)
## Summary
Fixes the issue where the "Creating Agent" spinner doesn't auto-update
when agent generation completes - user had to refresh the browser.

**Changes:**
- **Frontend polling**: Add `onOperationStarted` callback to trigger
polling when `operation_started` is received via SSE
- **Polling backoff**: 2s, 4s, 6s, 8s... up to 30s max
- **Message deduplication**: Use content-based keys (role + content)
instead of timestamps to prevent duplicate messages
- **Message ordering**: Preserve server message order instead of
timestamp-based sorting
- **Debug cleanup**: Remove verbose console.log/console.info statements

## Test plan
- [ ] Start agent generation in copilot
- [ ] Verify "Creating Agent" spinner appears
- [ ] Wait for completion (2-5 min) WITHOUT refreshing
- [ ] Verify agent carousel appears automatically when done
- [ ] Verify no duplicate messages in chat
- [ ] Verify message order is correct (user → assistant → tool_call →
tool_response)
2026-01-28 10:03:21 +07:00
Nicholas Tindle
ea035224bc feat(copilot): Increase max_agent_runs and max_agent_schedules (#11865)
<!-- Clearly explain the need for these changes: -->
Config change to increase the max times an agent can run in the chat and
the max number of scheduels created by copilot in one chat

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Increases per-chat operational limits for Copilot.
> 
> - Bumps `max_agent_runs` default from `3` to `30` in `ChatConfig`
> - Bumps `max_agent_schedules` default from `3` to `30` in `ChatConfig`
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
93cbae6d27. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
2026-01-28 01:08:02 +00:00
Nicholas Tindle
62813a1ea6 Delete backend/blocks/video/__init__.py (#11864)
<!-- Clearly explain the need for these changes: -->
oops file
### Changes 🏗️

<!-- Concisely describe all of the changes made in this pull request:
-->
removes file that should have not been commited

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Removes erroneous `backend/blocks/video/__init__.py`, eliminating an
unintended `video` package.
> 
> - Deletes a placeholder comment-only file
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
3b84576c33. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
2026-01-28 00:58:49 +00:00
Bently
67405f7eb9 fix(copilot): ensure tool_call/tool_response pairs stay intact during context compaction (#11863)
## Summary

Fixes context compaction breaking tool_call/tool_response pairs, causing
API validation errors.

## Problem

When context compaction slices messages with `messages[-KEEP_RECENT:]`,
a naive slice can separate an assistant message containing `tool_calls`
from its corresponding tool response messages. This causes API
validation errors like:

```
messages.0.content.1: unexpected 'tool_use_id' found in 'tool_result' blocks: orphan_12345.
Each 'tool_result' block must have a corresponding 'tool_use' block in the previous message.
```

## Solution

Added `_ensure_tool_pairs_intact()` helper function that:
1. Detects orphan tool responses in a slice (tool messages whose
`tool_call_id` has no matching assistant message)
2. Extends the slice backwards to include the missing assistant messages
3. Falls back to removing orphan tool responses if the assistant cannot
be found (edge case)

Applied this safeguard to:
- The initial `KEEP_RECENT` slice (line ~990)
- The progressive fallback slices when still over token limit (line
~1079)

## Testing

- Syntax validated with `python -m py_compile`
- Logic reviewed for correctness

## Linear

Fixes SECRT-1839

---
*Debugged by Toran & Orion in #agpt Discord*
2026-01-28 00:21:54 +00:00
Zamil Majdy
171ff6e776 feat(backend): persist long-running tool results to survive SSE disconnects (#11856)
## Summary

Agent generation (`create_agent`, `edit_agent`) can take 1-5 minutes.
Previously, if the user closed their browser tab during this time:
1. The SSE connection would die
2. The tool execution would be cancelled via `CancelledError`
3. The result would be lost - even if the agent-generator service
completed successfully

This PR ensures long-running tool operations survive SSE disconnections.

### Changes 🏗️

**Backend:**
- **base.py**: Added `is_long_running` property to `BaseTool` for tools
to opt-in to background execution
- **create_agent.py / edit_agent.py**: Set `is_long_running = True`
- **models.py**: Added `OperationStartedResponse`,
`OperationPendingResponse`, `OperationInProgressResponse` types
- **service.py**: Modified `_yield_tool_call()` to:
  - Check if tool is `is_long_running`
  - Save "pending" message to chat history immediately
  - Spawn background task that runs independently of SSE
  - Return `operation_started` immediately (don't wait)
  - Update chat history with result when background task completes
- Track running operations for idempotency (prevents duplicate ops on
refresh)
- **db.py**: Added `update_tool_message_content()` to update pending
messages
- **model.py**: Added `invalidate_session_cache()` to clear Redis after
background completion

**Frontend:**
- **useChatMessage.ts**: Added operation message types
- **helpers.ts**: Handle `operation_started`, `operation_pending`,
`operation_in_progress` response types
- **PendingOperationWidget**: New component to display operation status
with spinner
- **ChatMessage.tsx**: Render `PendingOperationWidget` for operation
messages

### How It Works

```
User Request → Save "pending" message → Spawn background task → Return immediately
                                              ↓
                                     Task runs independently of SSE
                                              ↓
                                     On completion: Update message in chat history
                                              ↓
                                     User refreshes → Loads history → Sees result
```

### User Experience

1. User requests agent creation
2. Sees "Agent creation started. You can close this tab - check your
library in a few minutes."
3. Can close browser tab safely
4. When they return, chat shows the completed result (or error)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] pyright passes (0 errors)
  - [x] TypeScript checks pass
  - [x] Formatters applied

### Test Plan

1. Start agent creation in copilot
2. Close browser tab immediately after seeing "operation_started" 
3. Wait 2-3 minutes
4. Reopen chat
5. Verify: Chat history shows completion message and agent appears in
library

---------

Co-authored-by: Ubbe <hi@ubbe.dev>
2026-01-28 05:09:34 +07:00
Lluis Agusti
349b1f9c79 hotfix(frontend): copilot session handling refinements... 2026-01-28 02:53:45 +07:00
Lluis Agusti
277b0537e9 hotfix(frontend): copilot simplication... 2026-01-28 02:10:18 +07:00
Ubbe
071b3bb5cd fix(frontend): more copilot refinements (#11858)
## Changes 🏗️

On the **Copilot** page:

- prevent unnecessary sidebar repaints 
- show a disclaimer when switching chats on the sidebar to terminate a
current stream
- handle loading better
- save streams better when disconnecting


### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Run the app locally and test the above
2026-01-28 00:49:28 +07:00
Swifty
2134d777be fix(backend): exclude disabled blocks from chat search and indexing (#11854)
## Summary

Disabled blocks (e.g., webhook blocks without `platform_base_url`
configured) were being indexed and returned in chat tool search results.
This PR ensures they are properly filtered out.

### Changes 🏗️

- **find_block.py**: Skip disabled blocks when enriching search results
- **content_handlers.py**: 
  - Skip disabled blocks during embedding indexing
- Update `get_stats()` to only count enabled blocks for accurate
coverage metrics

### Why

Blocks can be disabled for various reasons (missing OAuth config, no
platform URL for webhooks, etc.). These blocks shouldn't appear in
search results since users cannot use them.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified disabled blocks are filtered from search results
  - [x] Verified disabled blocks are not indexed
  - [x] Verified stats accurately reflect enabled block count
2026-01-27 15:21:13 +00:00
Ubbe
962824c8af refactor(frontend): copilot session management stream updates (#11853)
## Changes 🏗️

- **Fix infinite loop in copilot page**
- use Zustand selectors instead of full store object to get stable
function references
- **Centralize chat streaming logic**
- move all streaming files from `providers/chat-stream/` to
`components/contextual/Chat/` for better colocation and reusability
- **Rename `copilot-store` → `copilot-page-store`**: Clarify scope
- **Fix message duplication**
- Only replay chunks from active streams (not completed ones) since
backend already provides persisted messages in `initialMessages`
- **Auto-focus chat input**
  - Focus textarea when streaming ends and input is re-enabled
- **Graceful error display**
- Render tool response errors in muted style (small text + warning icon)
instead of raw "Error: ..." text

## Checklist 📋

### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Navigate to copilot page - no infinite loop errors
  - [x] Start a new chat, send message, verify streaming works
- [x] Navigate away and back to a completed session - no duplicate
messages
  - [x] After stream completes, verify chat input receives focus
  - [x] Trigger a tool error - verify it displays with muted styling
2026-01-27 22:09:25 +07:00
Zamil Majdy
3e9d5d0d50 fix(backend): handle race condition in review processing gracefully (#11845)
## Summary
- Fixes race condition when multiple concurrent requests try to process
the same reviews (e.g., double-click, multiple browser tabs)
- Previously the second request would fail with "Reviews not found,
access denied, or not in WAITING status"
- Now handles this gracefully by treating already-processed reviews with
the same decision as success

## Changes
- Added `get_reviews_by_node_exec_ids()` function that fetches reviews
regardless of status
- Modified `process_all_reviews_for_execution()` to handle
already-processed reviews
- Updated route to use idempotent validation

## Test plan
- [x] Linter passes (`poetry run ruff check`)
- [x] Type checker passes (`poetry run pyright`)
- [x] Formatter passes (`poetry run format`)
- [ ] Manual testing: double-click approve button should not cause
errors

Fixes AUTOGPT-SERVER-7HE
2026-01-27 21:43:31 +07:00
63 changed files with 2993 additions and 1558 deletions

View File

@@ -33,9 +33,15 @@ class ChatConfig(BaseSettings):
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries")
max_agent_runs: int = Field(default=3, description="Maximum number of agent runs")
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_schedules: int = Field(
default=3, description="Maximum number of agent schedules"
default=30, description="Maximum number of agent schedules"
)
# Long-running operation configuration
long_running_operation_ttl: int = Field(
default=600,
description="TTL in seconds for long-running operation tracking in Redis (safety net if pod dies)",
)
# Langfuse Prompt Management Configuration

View File

@@ -247,3 +247,45 @@ async def get_chat_session_message_count(session_id: str) -> int:
"""Get the number of messages in a chat session."""
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
return count
async def update_tool_message_content(
session_id: str,
tool_call_id: str,
new_content: str,
) -> bool:
"""Update the content of a tool message in chat history.
Used by background tasks to update pending operation messages with final results.
Args:
session_id: The chat session ID.
tool_call_id: The tool call ID to find the message.
new_content: The new content to set.
Returns:
True if a message was updated, False otherwise.
"""
try:
result = await PrismaChatMessage.prisma().update_many(
where={
"sessionId": session_id,
"toolCallId": tool_call_id,
},
data={
"content": new_content,
},
)
if result == 0:
logger.warning(
f"No message found to update for session {session_id}, "
f"tool_call_id {tool_call_id}"
)
return False
return True
except Exception as e:
logger.error(
f"Failed to update tool message for session {session_id}, "
f"tool_call_id {tool_call_id}: {e}"
)
return False

View File

@@ -295,6 +295,21 @@ async def cache_chat_session(session: ChatSession) -> None:
await _cache_session(session)
async def invalidate_session_cache(session_id: str) -> None:
"""Invalidate a chat session from Redis cache.
Used by background tasks to ensure fresh data is loaded on next access.
This is best-effort - Redis failures are logged but don't fail the operation.
"""
try:
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
except Exception as e:
# Best-effort: log but don't fail - cache will expire naturally
logger.warning(f"Failed to invalidate session cache for {session_id}: {e}")
async def _get_session_from_db(session_id: str) -> ChatSession | None:
"""Get a chat session from the database."""
prisma_session = await chat_db.get_chat_session(session_id)

View File

@@ -17,6 +17,7 @@ from openai import (
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
from backend.data.redis_client import get_redis_async
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
@@ -24,6 +25,7 @@ from backend.data.understanding import (
from backend.util.exceptions import NotFoundError
from backend.util.settings import Settings
from . import db as chat_db
from .config import ChatConfig
from .model import (
ChatMessage,
@@ -31,6 +33,7 @@ from .model import (
Usage,
cache_chat_session,
get_chat_session,
invalidate_session_cache,
update_session_title,
upsert_chat_session,
)
@@ -48,8 +51,13 @@ from .response_model import (
StreamToolOutputAvailable,
StreamUsage,
)
from .tools import execute_tool, tools
from .tools.models import ErrorResponse
from .tools import execute_tool, get_tool, tools
from .tools.models import (
ErrorResponse,
OperationInProgressResponse,
OperationPendingResponse,
OperationStartedResponse,
)
from .tracking import track_user_message
logger = logging.getLogger(__name__)
@@ -61,11 +69,126 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# Redis key prefix for tracking running long-running operations
# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh
RUNNING_OPERATION_PREFIX = "chat:running_operation:"
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
# Default system prompt used when Langfuse is not configured
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
pass
Here is everything you know about the current user from previous interactions:
<users_information>
{users_information}
</users_information>
## YOUR CORE MANDATE
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
# Module-level set to hold strong references to background tasks.
# This prevents asyncio from garbage collecting tasks before they complete.
# Tasks are automatically removed on completion via done_callback.
_background_tasks: set[asyncio.Task] = set()
async def _mark_operation_started(tool_call_id: str) -> bool:
"""Mark a long-running operation as started (Redis-based).
Returns True if successfully marked (operation was not already running),
False if operation was already running (lost race condition).
Raises exception if Redis is unavailable (fail-closed).
"""
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
# SETNX with TTL - atomic "set if not exists"
result = await redis.set(key, "1", ex=config.long_running_operation_ttl, nx=True)
return result is not None
async def _mark_operation_completed(tool_call_id: str) -> None:
"""Mark a long-running operation as completed (remove Redis key).
This is best-effort - if Redis fails, the TTL will eventually clean up.
"""
try:
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
await redis.delete(key)
except Exception as e:
# Non-critical: TTL will clean up eventually
logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}")
def _is_langfuse_configured() -> bool:
@@ -75,6 +198,30 @@ def _is_langfuse_configured() -> bool:
)
async def _get_system_prompt_template(context: str) -> str:
"""Get the system prompt, trying Langfuse first with fallback to default.
Args:
context: The user context/information to compile into the prompt.
Returns:
The compiled system prompt string.
"""
if _is_langfuse_configured():
try:
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
# Use asyncio.to_thread to avoid blocking the event loop
prompt = await asyncio.to_thread(
langfuse.get_prompt, config.langfuse_prompt_name, cache_ttl_seconds=0
)
return prompt.compile(users_information=context)
except Exception as e:
logger.warning(f"Failed to fetch prompt from Langfuse, using default: {e}")
# Fallback to default prompt
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
@@ -83,12 +230,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
Tuple of (compiled prompt string, business understanding object)
"""
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0)
# If user is authenticated, try to fetch their business understanding
understanding = None
if user_id:
@@ -97,12 +240,13 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
understanding = None
if understanding:
context = format_understanding_for_prompt(understanding)
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = prompt.compile(users_information=context)
compiled = await _get_system_prompt_template(context)
return compiled, understanding
@@ -210,16 +354,6 @@ async def stream_chat_completion(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
# Check if Langfuse is configured - required for chat functionality
if not _is_langfuse_configured():
logger.error("Chat request failed: Langfuse is not configured")
yield StreamError(
errorText="Chat service is not available. Langfuse must be configured "
"with LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables."
)
yield StreamFinish()
return
# Only fetch from Redis if session not provided (initial call)
if session is None:
session = await get_chat_session(session_id, user_id)
@@ -315,6 +449,7 @@ async def stream_chat_completion(
has_yielded_end = False
has_yielded_error = False
has_done_tool_call = False
has_long_running_tool_call = False # Track if we had a long-running tool call
has_received_text = False
text_streaming_ended = False
tool_response_messages: list[ChatMessage] = []
@@ -336,7 +471,6 @@ async def stream_chat_completion(
system_prompt=system_prompt,
text_block_id=text_block_id,
):
if isinstance(chunk, StreamTextStart):
# Emit text-start before first text delta
if not has_received_text:
@@ -394,13 +528,34 @@ async def stream_chat_completion(
if isinstance(chunk.output, str)
else orjson.dumps(chunk.output).decode("utf-8")
)
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.toolCallId,
# Skip saving long-running operation responses - messages already saved in _yield_tool_call
# Use JSON parsing instead of substring matching to avoid false positives
is_long_running_response = False
try:
parsed = orjson.loads(result_content)
if isinstance(parsed, dict) and parsed.get("type") in (
"operation_started",
"operation_in_progress",
):
is_long_running_response = True
except (orjson.JSONDecodeError, TypeError):
pass # Not JSON or not a dict - treat as regular response
if is_long_running_response:
# Remove from accumulated_tool_calls since assistant message was already saved
accumulated_tool_calls[:] = [
tc
for tc in accumulated_tool_calls
if tc["id"] != chunk.toolCallId
]
has_long_running_tool_call = True
else:
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.toolCallId,
)
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
@@ -576,7 +731,14 @@ async def stream_chat_completion(
logger.info(
f"Extended session messages, new message_count={len(session.messages)}"
)
if messages_to_save or has_appended_streaming_message:
# Save if there are regular (non-long-running) tool responses or streaming message.
# Long-running tools save their own state, but we still need to save regular tools
# that may be in the same response.
has_regular_tool_responses = len(tool_response_messages) > 0
if has_regular_tool_responses or (
not has_long_running_tool_call
and (messages_to_save or has_appended_streaming_message)
):
await upsert_chat_session(session)
else:
logger.info(
@@ -585,7 +747,9 @@ async def stream_chat_completion(
)
# If we did a tool call, stream the chat completion again to get the next response
if has_done_tool_call:
# Skip only if ALL tools were long-running (they handle their own completion)
has_regular_tools = len(tool_response_messages) > 0
if has_done_tool_call and (has_regular_tools or not has_long_running_tool_call):
logger.info(
"Tool call executed, streaming chat completion again to get assistant response"
)
@@ -725,6 +889,114 @@ async def _summarize_messages(
return summary or "No summary available."
def _ensure_tool_pairs_intact(
recent_messages: list[dict],
all_messages: list[dict],
start_index: int,
) -> list[dict]:
"""
Ensure tool_call/tool_response pairs stay together after slicing.
When slicing messages for context compaction, a naive slice can separate
an assistant message containing tool_calls from its corresponding tool
response messages. This causes API validation errors (e.g., Anthropic's
"unexpected tool_use_id found in tool_result blocks").
This function checks for orphan tool responses in the slice and extends
backwards to include their corresponding assistant messages.
Args:
recent_messages: The sliced messages to validate
all_messages: The complete message list (for looking up missing assistants)
start_index: The index in all_messages where recent_messages begins
Returns:
A potentially extended list of messages with tool pairs intact
"""
if not recent_messages:
return recent_messages
# Collect all tool_call_ids from assistant messages in the slice
available_tool_call_ids: set[str] = set()
for msg in recent_messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tc_id = tc.get("id")
if tc_id:
available_tool_call_ids.add(tc_id)
# Find orphan tool responses (tool messages whose tool_call_id is missing)
orphan_tool_call_ids: set[str] = set()
for msg in recent_messages:
if msg.get("role") == "tool":
tc_id = msg.get("tool_call_id")
if tc_id and tc_id not in available_tool_call_ids:
orphan_tool_call_ids.add(tc_id)
if not orphan_tool_call_ids:
# No orphans, slice is valid
return recent_messages
# Find the assistant messages that contain the orphan tool_call_ids
# Search backwards from start_index in all_messages
messages_to_prepend: list[dict] = []
for i in range(start_index - 1, -1, -1):
msg = all_messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
msg_tool_ids = {tc.get("id") for tc in msg["tool_calls"] if tc.get("id")}
if msg_tool_ids & orphan_tool_call_ids:
# This assistant message has tool_calls we need
# Also collect its contiguous tool responses that follow it
assistant_and_responses: list[dict] = [msg]
# Scan forward from this assistant to collect tool responses
for j in range(i + 1, start_index):
following_msg = all_messages[j]
if following_msg.get("role") == "tool":
tool_id = following_msg.get("tool_call_id")
if tool_id and tool_id in msg_tool_ids:
assistant_and_responses.append(following_msg)
else:
# Stop at first non-tool message
break
# Prepend the assistant and its tool responses (maintain order)
messages_to_prepend = assistant_and_responses + messages_to_prepend
# Mark these as found
orphan_tool_call_ids -= msg_tool_ids
# Also add this assistant's tool_call_ids to available set
available_tool_call_ids |= msg_tool_ids
if not orphan_tool_call_ids:
# Found all missing assistants
break
if orphan_tool_call_ids:
# Some tool_call_ids couldn't be resolved - remove those tool responses
# This shouldn't happen in normal operation but handles edge cases
logger.warning(
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
"Removing orphan tool responses."
)
recent_messages = [
msg
for msg in recent_messages
if not (
msg.get("role") == "tool"
and msg.get("tool_call_id") in orphan_tool_call_ids
)
]
if messages_to_prepend:
logger.info(
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
f"tool_call/tool_response pairs"
)
return messages_to_prepend + recent_messages
return recent_messages
async def _stream_chat_chunks(
session: ChatSession,
tools: list[ChatCompletionToolParam],
@@ -816,7 +1088,15 @@ async def _stream_chat_chunks(
# Always attempt mitigation when over limit, even with few messages
if messages:
# Split messages based on whether system prompt exists
recent_messages = messages[-KEEP_RECENT:]
# Calculate start index for the slice
slice_start = max(0, len(messages_dict) - KEEP_RECENT)
recent_messages = messages_dict[-KEEP_RECENT:]
# Ensure tool_call/tool_response pairs stay together
# This prevents API errors from orphan tool responses
recent_messages = _ensure_tool_pairs_intact(
recent_messages, messages_dict, slice_start
)
if has_system_prompt:
# Keep system prompt separate, summarize everything between system and recent
@@ -903,6 +1183,13 @@ async def _stream_chat_chunks(
if len(recent_messages) >= keep_count
else recent_messages
)
# Ensure tool pairs stay intact in the reduced slice
reduced_slice_start = max(
0, len(recent_messages) - keep_count
)
reduced_recent = _ensure_tool_pairs_intact(
reduced_recent, recent_messages, reduced_slice_start
)
if has_system_prompt:
messages = [
system_msg,
@@ -961,7 +1248,10 @@ async def _stream_chat_chunks(
# Create a base list excluding system prompt to avoid duplication
# This is the pool of messages we'll slice from in the loop
base_msgs = messages[1:] if has_system_prompt else messages
# Use messages_dict for type consistency with _ensure_tool_pairs_intact
base_msgs = (
messages_dict[1:] if has_system_prompt else messages_dict
)
# Try progressively smaller keep counts
new_token_count = token_count # Initialize with current count
@@ -984,6 +1274,12 @@ async def _stream_chat_chunks(
# Slice from base_msgs to get recent messages (without system prompt)
recent_messages = base_msgs[-keep_count:]
# Ensure tool pairs stay intact in the reduced slice
reduced_slice_start = max(0, len(base_msgs) - keep_count)
recent_messages = _ensure_tool_pairs_intact(
recent_messages, base_msgs, reduced_slice_start
)
if has_system_prompt:
messages = [system_msg] + recent_messages
else:
@@ -1260,17 +1556,19 @@ async def _yield_tool_call(
"""
Yield a tool call and its execution result.
For long-running tools, yields heartbeat events every 15 seconds to keep
the SSE connection alive through proxies and load balancers.
For tools marked with `is_long_running=True` (like agent generation), spawns a
background task so the operation survives SSE disconnections. For other tools,
yields heartbeat events every 15 seconds to keep the SSE connection alive.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
import uuid as uuid_module
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - handle empty arguments gracefully
raw_arguments = tool_calls[yield_idx]["function"]["arguments"]
@@ -1285,7 +1583,151 @@ async def _yield_tool_call(
input=arguments,
)
# Run tool execution in background task with heartbeats to keep connection alive
# Check if this tool is long-running (survives SSE disconnection)
tool = get_tool(tool_name)
if tool and tool.is_long_running:
# Atomic check-and-set: returns False if operation already running (lost race)
if not await _mark_operation_started(tool_call_id):
logger.info(
f"Tool call {tool_call_id} already in progress, returning status"
)
# Build dynamic message based on tool name
if tool_name == "create_agent":
in_progress_msg = "Agent creation already in progress. Please wait..."
elif tool_name == "edit_agent":
in_progress_msg = "Agent edit already in progress. Please wait..."
else:
in_progress_msg = f"{tool_name} already in progress. Please wait..."
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationInProgressResponse(
message=in_progress_msg,
tool_call_id=tool_call_id,
).model_dump_json(),
success=True,
)
return
# Generate operation ID
operation_id = str(uuid_module.uuid4())
# Build a user-friendly message based on tool and arguments
if tool_name == "create_agent":
agent_desc = arguments.get("description", "")
# Truncate long descriptions for the message
desc_preview = (
(agent_desc[:100] + "...") if len(agent_desc) > 100 else agent_desc
)
pending_msg = (
f"Creating your agent: {desc_preview}"
if desc_preview
else "Creating agent... This may take a few minutes."
)
started_msg = (
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
)
elif tool_name == "edit_agent":
changes = arguments.get("changes", "")
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
pending_msg = (
f"Editing agent: {changes_preview}"
if changes_preview
else "Editing agent... This may take a few minutes."
)
started_msg = (
"Agent edit started. You can close this tab - "
"check your library in a few minutes."
)
else:
pending_msg = f"Running {tool_name}... This may take a few minutes."
started_msg = (
f"{tool_name} started. You can close this tab - "
"check back in a few minutes."
)
# Track appended messages for rollback on failure
assistant_message: ChatMessage | None = None
pending_message: ChatMessage | None = None
# Wrap session save and task creation in try-except to release lock on failure
try:
# Save assistant message with tool_call FIRST (required by LLM)
assistant_message = ChatMessage(
role="assistant",
content="",
tool_calls=[tool_calls[yield_idx]],
)
session.messages.append(assistant_message)
# Then save pending tool result
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} for tool {tool_name} "
f"in session {session.session_id}"
)
# Store task reference in module-level set to prevent GC before completion
task = asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
operation_id=operation_id,
session_id=session.session_id,
user_id=session.user_id,
)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
except Exception as e:
# Roll back appended messages to prevent data corruption on subsequent saves
if (
pending_message
and session.messages
and session.messages[-1] == pending_message
):
session.messages.pop()
if (
assistant_message
and session.messages
and session.messages[-1] == assistant_message
):
session.messages.pop()
# Release the Redis lock since the background task won't be spawned
await _mark_operation_completed(tool_call_id)
logger.error(
f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True
)
raise
# Return immediately - don't wait for completion
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationStartedResponse(
message=started_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
success=True,
)
return
# Normal flow: Run tool execution in background task with heartbeats
tool_task = asyncio.create_task(
execute_tool(
tool_name=tool_name,
@@ -1335,3 +1777,190 @@ async def _yield_tool_call(
)
yield tool_execution_response
async def _execute_long_running_tool(
tool_name: str,
parameters: dict[str, Any],
tool_call_id: str,
operation_id: str,
session_id: str,
user_id: str | None,
) -> None:
"""Execute a long-running tool in background and update chat history with result.
This function runs independently of the SSE connection, so the operation
survives if the user closes their browser tab.
"""
try:
# Load fresh session (not stale reference)
session = await get_chat_session(session_id, user_id)
if not session:
logger.error(f"Session {session_id} not found for background tool")
return
# Execute the actual tool
result = await execute_tool(
tool_name=tool_name,
parameters=parameters,
tool_call_id=tool_call_id,
user_id=user_id,
session=session,
)
# Update the pending message with result
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=(
result.output
if isinstance(result.output, str)
else orjson.dumps(result.output).decode("utf-8")
),
)
logger.info(f"Background tool {tool_name} completed for session {session_id}")
# Generate LLM continuation so user sees response when they poll/refresh
await _generate_llm_continuation(session_id=session_id, user_id=user_id)
except Exception as e:
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
error_response = ErrorResponse(
message=f"Tool {tool_name} failed: {str(e)}",
)
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=error_response.model_dump_json(),
)
finally:
await _mark_operation_completed(tool_call_id)
async def _update_pending_operation(
session_id: str,
tool_call_id: str,
result: str,
) -> None:
"""Update the pending tool message with final result.
This is called by background tasks when long-running operations complete.
"""
# Update the message in database
updated = await chat_db.update_tool_message_content(
session_id=session_id,
tool_call_id=tool_call_id,
new_content=result,
)
if updated:
# Invalidate Redis cache so next load gets fresh data
# Wrap in try/except to prevent cache failures from triggering error handling
# that would overwrite our successful DB update
try:
await invalidate_session_cache(session_id)
except Exception as e:
# Non-critical: cache will eventually be refreshed on next load
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
logger.info(
f"Updated pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)
else:
logger.warning(
f"Failed to update pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)
async def _generate_llm_continuation(
session_id: str,
user_id: str | None,
) -> None:
"""Generate an LLM response after a long-running tool completes.
This is called by background tasks to continue the conversation
after a tool result is saved. The response is saved to the database
so users see it when they refresh or poll.
"""
try:
# Load fresh session from DB (bypass cache to get the updated tool result)
await invalidate_session_cache(session_id)
session = await get_chat_session(session_id, user_id)
if not session:
logger.error(f"Session {session_id} not found for LLM continuation")
return
# Build system prompt
system_prompt, _ = await _build_system_prompt(user_id)
# Build messages in OpenAI format
messages = session.to_openai_messages()
if system_prompt:
from openai.types.chat import ChatCompletionSystemMessageParam
system_message = ChatCompletionSystemMessageParam(
role="system",
content=system_prompt,
)
messages = [system_message] + messages
# Build extra_body for tracing
extra_body: dict[str, Any] = {
"posthogProperties": {
"environment": settings.config.app_env.value,
},
}
if user_id:
extra_body["user"] = user_id[:128]
extra_body["posthogDistinctId"] = user_id
if session_id:
extra_body["session_id"] = session_id[:128]
# Make non-streaming LLM call (no tools - just text response)
from typing import cast
from openai.types.chat import ChatCompletionMessageParam
# No tools parameter = text-only response (no tool calls)
response = await client.chat.completions.create(
model=config.model,
messages=cast(list[ChatCompletionMessageParam], messages),
extra_body=extra_body,
)
if response.choices and response.choices[0].message.content:
assistant_content = response.choices[0].message.content
# Reload session from DB to avoid race condition with user messages
# that may have been sent while we were generating the LLM response
fresh_session = await get_chat_session(session_id, user_id)
if not fresh_session:
logger.error(
f"Session {session_id} disappeared during LLM continuation"
)
return
# Save assistant message to database
assistant_message = ChatMessage(
role="assistant",
content=assistant_content,
)
fresh_session.messages.append(assistant_message)
# Save to database (not cache) to persist the response
await upsert_chat_session(fresh_session)
# Invalidate cache so next poll/refresh gets fresh data
await invalidate_session_cache(session_id)
logger.info(
f"Generated LLM continuation for session {session_id}, "
f"response length: {len(assistant_content)}"
)
else:
logger.warning(f"LLM continuation returned empty response for {session_id}")
except Exception as e:
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)

View File

@@ -49,6 +49,11 @@ tools: list[ChatCompletionToolParam] = [
]
def get_tool(tool_name: str) -> BaseTool | None:
"""Get a tool instance by name."""
return TOOL_REGISTRY.get(tool_name)
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
@@ -57,7 +62,7 @@ async def execute_tool(
tool_call_id: str,
) -> "StreamToolOutputAvailable":
"""Execute a tool by name."""
tool = TOOL_REGISTRY.get(tool_name)
tool = get_tool(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")

View File

@@ -36,6 +36,16 @@ class BaseTool:
"""Whether this tool requires authentication."""
return False
@property
def is_long_running(self) -> bool:
"""Whether this tool is long-running and should execute in background.
Long-running tools (like agent generation) are executed via background
tasks to survive SSE disconnections. The result is persisted to chat
history and visible when the user refreshes.
"""
return False
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(

View File

@@ -42,6 +42,10 @@ class CreateAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -42,6 +42,10 @@ class EditAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -107,7 +107,8 @@ class FindBlockTool(BaseTool):
block_id = result["content_id"]
block = get_block(block_id)
if block:
# Skip disabled blocks
if block and not block.disabled:
# Get input/output schemas
input_schema = {}
output_schema = {}

View File

@@ -28,6 +28,10 @@ class ResponseType(str, Enum):
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
# Long-running operation types
OPERATION_STARTED = "operation_started"
OPERATION_PENDING = "operation_pending"
OPERATION_IN_PROGRESS = "operation_in_progress"
# Base response model
@@ -334,3 +338,39 @@ class BlockOutputResponse(ToolResponseBase):
block_name: str
outputs: dict[str, list[Any]]
success: bool = True
# Long-running operation models
class OperationStartedResponse(ToolResponseBase):
"""Response when a long-running operation has been started in the background.
This is returned immediately to the client while the operation continues
to execute. The user can close the tab and check back later.
"""
type: ResponseType = ResponseType.OPERATION_STARTED
operation_id: str
tool_name: str
class OperationPendingResponse(ToolResponseBase):
"""Response stored in chat history while a long-running operation is executing.
This is persisted to the database so users see a pending state when they
refresh before the operation completes.
"""
type: ResponseType = ResponseType.OPERATION_PENDING
operation_id: str
tool_name: str
class OperationInProgressResponse(ToolResponseBase):
"""Response when an operation is already in progress.
Returned for idempotency when the same tool_call_id is requested again
while the background task is still running.
"""
type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
tool_call_id: str

View File

@@ -164,9 +164,9 @@ async def test_process_review_action_approve_success(
"""Test successful review approval"""
# Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -244,9 +244,9 @@ async def test_process_review_action_reject_success(
"""Test successful review rejection"""
# Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -339,9 +339,9 @@ async def test_process_review_action_mixed_success(
# Mock the route functions
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {
"test_node_123": sample_pending_review,
@@ -463,9 +463,9 @@ async def test_process_review_action_review_not_found(
test_user_id: str,
) -> None:
"""Test error when review is not found"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
# Return empty dict to simulate review not found
mock_get_reviews_for_user.return_value = {}
@@ -506,7 +506,7 @@ async def test_process_review_action_review_not_found(
response = await client.post("/api/review/action", json=request_data)
assert response.status_code == 404
assert "No pending review found" in response.json()["detail"]
assert "Review(s) not found" in response.json()["detail"]
@pytest.mark.asyncio(loop_scope="session")
@@ -517,9 +517,9 @@ async def test_process_review_action_partial_failure(
test_user_id: str,
) -> None:
"""Test handling of partial failures in review processing"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -567,9 +567,9 @@ async def test_process_review_action_invalid_node_exec_id(
test_user_id: str,
) -> None:
"""Test failure when trying to process review with invalid node execution ID"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
# Return empty dict to simulate review not found
mock_get_reviews_for_user.return_value = {}
@@ -596,7 +596,7 @@ async def test_process_review_action_invalid_node_exec_id(
# Returns 404 when review is not found
assert response.status_code == 404
assert "No pending review found" in response.json()["detail"]
assert "Review(s) not found" in response.json()["detail"]
@pytest.mark.asyncio(loop_scope="session")
@@ -607,9 +607,9 @@ async def test_process_review_action_auto_approve_creates_auto_approval_records(
test_user_id: str,
) -> None:
"""Test that auto_approve_future_actions flag creates auto-approval records"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -737,9 +737,9 @@ async def test_process_review_action_without_auto_approve_still_loads_settings(
test_user_id: str,
) -> None:
"""Test that execution context is created with settings even without auto-approve"""
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
mock_get_reviews_for_user.return_value = {"test_node_123": sample_pending_review}
@@ -885,9 +885,9 @@ async def test_process_review_action_auto_approve_only_applies_to_approved_revie
reviewed_at=FIXED_NOW,
)
# Mock get_pending_reviews_by_node_exec_ids (called to find the graph_exec_id)
# Mock get_reviews_by_node_exec_ids (called to find the graph_exec_id)
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
# Need to return both reviews in WAITING state (before processing)
approved_review_waiting = PendingHumanReviewModel(
@@ -1031,9 +1031,9 @@ async def test_process_review_action_per_review_auto_approve_granularity(
test_user_id: str,
) -> None:
"""Test that auto-approval can be set per-review (granular control)"""
# Mock get_pending_reviews_by_node_exec_ids - return different reviews based on node_exec_id
# Mock get_reviews_by_node_exec_ids - return different reviews based on node_exec_id
mock_get_reviews_for_user = mocker.patch(
"backend.api.features.executions.review.routes.get_pending_reviews_by_node_exec_ids"
"backend.api.features.executions.review.routes.get_reviews_by_node_exec_ids"
)
# Create a mapping of node_exec_id to review

View File

@@ -14,9 +14,9 @@ from backend.data.execution import (
from backend.data.graph import get_graph_settings
from backend.data.human_review import (
create_auto_approval_record,
get_pending_reviews_by_node_exec_ids,
get_pending_reviews_for_execution,
get_pending_reviews_for_user,
get_reviews_by_node_exec_ids,
has_pending_reviews_for_graph_exec,
process_all_reviews_for_execution,
)
@@ -137,17 +137,17 @@ async def process_review_action(
detail="At least one review must be provided",
)
# Batch fetch all requested reviews
reviews_map = await get_pending_reviews_by_node_exec_ids(
# Batch fetch all requested reviews (regardless of status for idempotent handling)
reviews_map = await get_reviews_by_node_exec_ids(
list(all_request_node_ids), user_id
)
# Validate all reviews were found
# Validate all reviews were found (must exist, any status is OK for now)
missing_ids = all_request_node_ids - set(reviews_map.keys())
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}",
detail=f"Review(s) not found: {', '.join(missing_ids)}",
)
# Validate all reviews belong to the same execution

View File

@@ -188,6 +188,10 @@ class BlockHandler(ContentHandler):
try:
block_instance = block_cls()
# Skip disabled blocks - they shouldn't be indexed
if block_instance.disabled:
continue
# Build searchable text from block metadata
parts = []
if hasattr(block_instance, "name") and block_instance.name:
@@ -248,12 +252,19 @@ class BlockHandler(ContentHandler):
from backend.data.block import get_blocks
all_blocks = get_blocks()
total_blocks = len(all_blocks)
# Filter out disabled blocks - they're not indexed
enabled_block_ids = [
block_id
for block_id, block_cls in all_blocks.items()
if not block_cls().disabled
]
total_blocks = len(enabled_block_ids)
if total_blocks == 0:
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
block_ids = list(all_blocks.keys())
block_ids = enabled_block_ids
placeholders = ",".join([f"${i+1}" for i in range(len(block_ids))])
embedded_result = await query_raw_with_schema(

View File

@@ -81,6 +81,7 @@ async def test_block_handler_get_missing_items(mocker):
mock_block_instance.name = "Calculator Block"
mock_block_instance.description = "Performs calculations"
mock_block_instance.categories = [MagicMock(value="MATH")]
mock_block_instance.disabled = False
mock_block_instance.input_schema.model_json_schema.return_value = {
"properties": {"expression": {"description": "Math expression to evaluate"}}
}
@@ -116,11 +117,18 @@ async def test_block_handler_get_stats(mocker):
"""Test BlockHandler returns correct stats."""
handler = BlockHandler()
# Mock get_blocks
# Mock get_blocks - each block class returns an instance with disabled=False
def make_mock_block_class():
mock_class = MagicMock()
mock_instance = MagicMock()
mock_instance.disabled = False
mock_class.return_value = mock_instance
return mock_class
mock_blocks = {
"block-1": MagicMock(),
"block-2": MagicMock(),
"block-3": MagicMock(),
"block-1": make_mock_block_class(),
"block-2": make_mock_block_class(),
"block-3": make_mock_block_class(),
}
# Mock embedded count query (2 blocks have embeddings)
@@ -309,6 +317,7 @@ async def test_block_handler_handles_missing_attributes():
mock_block_class = MagicMock()
mock_block_instance = MagicMock()
mock_block_instance.name = "Minimal Block"
mock_block_instance.disabled = False
# No description, categories, or schema
del mock_block_instance.description
del mock_block_instance.categories
@@ -342,6 +351,7 @@ async def test_block_handler_skips_failed_blocks():
good_instance.name = "Good Block"
good_instance.description = "Works fine"
good_instance.categories = []
good_instance.disabled = False
good_block.return_value = good_instance
bad_block = MagicMock()

View File

@@ -263,11 +263,14 @@ async def get_pending_review_by_node_exec_id(
return PendingHumanReviewModel.from_db(review, node_id=node_id)
async def get_pending_reviews_by_node_exec_ids(
async def get_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str
) -> dict[str, "PendingHumanReviewModel"]:
"""
Get multiple pending reviews by their node execution IDs in a single batch query.
Get multiple reviews by their node execution IDs regardless of status.
Unlike get_pending_reviews_by_node_exec_ids, this returns reviews in any status
(WAITING, APPROVED, REJECTED). Used for validation in idempotent operations.
Args:
node_exec_ids: List of node execution IDs to look up
@@ -283,7 +286,6 @@ async def get_pending_reviews_by_node_exec_ids(
where={
"nodeExecId": {"in": node_exec_ids},
"userId": user_id,
"status": ReviewStatus.WAITING,
}
)
@@ -407,38 +409,68 @@ async def process_all_reviews_for_execution(
) -> dict[str, PendingHumanReviewModel]:
"""Process all pending reviews for an execution with approve/reject decisions.
Handles race conditions gracefully: if a review was already processed with the
same decision by a concurrent request, it's treated as success rather than error.
Args:
user_id: User ID for ownership validation
review_decisions: Map of node_exec_id -> (status, reviewed_data, message)
Returns:
Dict of node_exec_id -> updated review model
Dict of node_exec_id -> updated review model (includes already-processed reviews)
"""
if not review_decisions:
return {}
node_exec_ids = list(review_decisions.keys())
# Get all reviews for validation
reviews = await PendingHumanReview.prisma().find_many(
# Get all reviews (both WAITING and already processed) for the user
all_reviews = await PendingHumanReview.prisma().find_many(
where={
"nodeExecId": {"in": node_exec_ids},
"userId": user_id,
"status": ReviewStatus.WAITING,
},
)
# Validate all reviews can be processed
if len(reviews) != len(node_exec_ids):
missing_ids = set(node_exec_ids) - {review.nodeExecId for review in reviews}
# Separate into pending and already-processed reviews
reviews_to_process = []
already_processed = []
for review in all_reviews:
if review.status == ReviewStatus.WAITING:
reviews_to_process.append(review)
else:
already_processed.append(review)
# Check for truly missing reviews (not found at all)
found_ids = {review.nodeExecId for review in all_reviews}
missing_ids = set(node_exec_ids) - found_ids
if missing_ids:
raise ValueError(
f"Reviews not found, access denied, or not in WAITING status: {', '.join(missing_ids)}"
f"Reviews not found or access denied: {', '.join(missing_ids)}"
)
# Create parallel update tasks
# Validate already-processed reviews have compatible status (same decision)
# This handles race conditions where another request processed the same reviews
for review in already_processed:
requested_status = review_decisions[review.nodeExecId][0]
if review.status != requested_status:
raise ValueError(
f"Review {review.nodeExecId} was already processed with status "
f"{review.status}, cannot change to {requested_status}"
)
# Log if we're handling a race condition (some reviews already processed)
if already_processed:
already_processed_ids = [r.nodeExecId for r in already_processed]
logger.info(
f"Race condition handled: {len(already_processed)} review(s) already "
f"processed by concurrent request: {already_processed_ids}"
)
# Create parallel update tasks for reviews that still need processing
update_tasks = []
for review in reviews:
for review in reviews_to_process:
new_status, reviewed_data, message = review_decisions[review.nodeExecId]
has_data_changes = reviewed_data is not None and reviewed_data != review.payload
@@ -463,7 +495,7 @@ async def process_all_reviews_for_execution(
update_tasks.append(task)
# Execute all updates in parallel and get updated reviews
updated_reviews = await asyncio.gather(*update_tasks)
updated_reviews = await asyncio.gather(*update_tasks) if update_tasks else []
# Note: Execution resumption is now handled at the API layer after ALL reviews
# for an execution are processed (both approved and rejected)
@@ -472,8 +504,11 @@ async def process_all_reviews_for_execution(
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
# Combine updated reviews with already-processed ones (for idempotent response)
all_result_reviews = list(updated_reviews) + already_processed
result = {}
for review in updated_reviews:
for review in all_result_reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(

View File

@@ -359,8 +359,8 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The port for the Agent Generator service",
)
agentgenerator_timeout: int = Field(
default=120,
description="The timeout in seconds for Agent Generator service requests",
default=600,
description="The timeout in seconds for Agent Generator service requests (includes retries for rate limits)",
)
enable_example_blocks: bool = Field(

View File

@@ -1,41 +0,0 @@
"use client";
import { createContext, useContext, useRef, type ReactNode } from "react";
interface NewChatContextValue {
onNewChatClick: () => void;
setOnNewChatClick: (handler?: () => void) => void;
performNewChat?: () => void;
setPerformNewChat: (handler?: () => void) => void;
}
const NewChatContext = createContext<NewChatContextValue | null>(null);
export function NewChatProvider({ children }: { children: ReactNode }) {
const onNewChatRef = useRef<(() => void) | undefined>();
const performNewChatRef = useRef<(() => void) | undefined>();
const contextValueRef = useRef<NewChatContextValue>({
onNewChatClick() {
onNewChatRef.current?.();
},
setOnNewChatClick(handler?: () => void) {
onNewChatRef.current = handler;
},
performNewChat() {
performNewChatRef.current?.();
},
setPerformNewChat(handler?: () => void) {
performNewChatRef.current = handler;
},
});
return (
<NewChatContext.Provider value={contextValueRef.current}>
{children}
</NewChatContext.Provider>
);
}
export function useNewChat() {
return useContext(NewChatContext);
}

View File

@@ -1,12 +1,10 @@
"use client";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
import { Text } from "@/components/atoms/Text/Text";
import { NAVBAR_HEIGHT_PX } from "@/lib/constants";
import type { ReactNode } from "react";
import { useEffect } from "react";
import { useNewChat } from "../../NewChatContext";
import { DesktopSidebar } from "./components/DesktopSidebar/DesktopSidebar";
import { LoadingState } from "./components/LoadingState/LoadingState";
import { MobileDrawer } from "./components/MobileDrawer/MobileDrawer";
import { MobileHeader } from "./components/MobileHeader/MobileHeader";
import { useCopilotShell } from "./useCopilotShell";
@@ -20,36 +18,21 @@ export function CopilotShell({ children }: Props) {
isMobile,
isDrawerOpen,
isLoading,
isCreatingSession,
isLoggedIn,
hasActiveSession,
sessions,
currentSessionId,
handleSelectSession,
handleOpenDrawer,
handleCloseDrawer,
handleDrawerOpenChange,
handleNewChat,
handleNewChatClick,
handleSessionClick,
hasNextPage,
isFetchingNextPage,
fetchNextPage,
isReadyToShowContent,
} = useCopilotShell();
const newChatContext = useNewChat();
const handleNewChatClickWrapper =
newChatContext?.onNewChatClick || handleNewChat;
useEffect(
function registerNewChatHandler() {
if (!newChatContext) return;
newChatContext.setPerformNewChat(handleNewChat);
return function cleanup() {
newChatContext.setPerformNewChat(undefined);
};
},
[newChatContext, handleNewChat],
);
if (!isLoggedIn) {
return (
<div className="flex h-full items-center justify-center">
@@ -70,9 +53,9 @@ export function CopilotShell({ children }: Props) {
isLoading={isLoading}
hasNextPage={hasNextPage}
isFetchingNextPage={isFetchingNextPage}
onSelectSession={handleSelectSession}
onSelectSession={handleSessionClick}
onFetchNextPage={fetchNextPage}
onNewChat={handleNewChatClickWrapper}
onNewChat={handleNewChatClick}
hasActiveSession={Boolean(hasActiveSession)}
/>
)}
@@ -80,7 +63,18 @@ export function CopilotShell({ children }: Props) {
<div className="relative flex min-h-0 flex-1 flex-col">
{isMobile && <MobileHeader onOpenDrawer={handleOpenDrawer} />}
<div className="flex min-h-0 flex-1 flex-col">
{isReadyToShowContent ? children : <LoadingState />}
{isCreatingSession ? (
<div className="flex h-full flex-1 flex-col items-center justify-center bg-[#f8f8f9]">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Creating your chat...
</Text>
</div>
</div>
) : (
children
)}
</div>
</div>
@@ -92,9 +86,9 @@ export function CopilotShell({ children }: Props) {
isLoading={isLoading}
hasNextPage={hasNextPage}
isFetchingNextPage={isFetchingNextPage}
onSelectSession={handleSelectSession}
onSelectSession={handleSessionClick}
onFetchNextPage={fetchNextPage}
onNewChat={handleNewChatClickWrapper}
onNewChat={handleNewChatClick}
onClose={handleCloseDrawer}
onOpenChange={handleDrawerOpenChange}
hasActiveSession={Boolean(hasActiveSession)}

View File

@@ -1,15 +0,0 @@
import { Text } from "@/components/atoms/Text/Text";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
export function LoadingState() {
return (
<div className="flex flex-1 items-center justify-center">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Loading your chats...
</Text>
</div>
</div>
);
}

View File

@@ -3,17 +3,17 @@ import { useState } from "react";
export function useMobileDrawer() {
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
function handleOpenDrawer() {
const handleOpenDrawer = () => {
setIsDrawerOpen(true);
}
};
function handleCloseDrawer() {
const handleCloseDrawer = () => {
setIsDrawerOpen(false);
}
};
function handleDrawerOpenChange(open: boolean) {
const handleDrawerOpenChange = (open: boolean) => {
setIsDrawerOpen(open);
}
};
return {
isDrawerOpen,

View File

@@ -1,7 +1,7 @@
import { useGetV2ListSessions } from "@/app/api/__generated__/endpoints/chat/chat";
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { okData } from "@/app/api/helpers";
import { useEffect, useMemo, useState } from "react";
import { useEffect, useState } from "react";
const PAGE_SIZE = 50;
@@ -11,9 +11,11 @@ export interface UseSessionsPaginationArgs {
export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
const [offset, setOffset] = useState(0);
const [accumulatedSessions, setAccumulatedSessions] = useState<
SessionSummaryResponse[]
>([]);
const [totalCount, setTotalCount] = useState<number | null>(null);
const { data, isLoading, isFetching, isError } = useGetV2ListSessions(
@@ -43,17 +45,14 @@ export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
}
}, [data, offset, enabled]);
const hasNextPage = useMemo(() => {
if (totalCount === null) return false;
return accumulatedSessions.length < totalCount;
}, [accumulatedSessions.length, totalCount]);
const hasNextPage =
totalCount !== null && accumulatedSessions.length < totalCount;
const areAllSessionsLoaded = useMemo(() => {
if (totalCount === null) return false;
return (
accumulatedSessions.length >= totalCount && !isFetching && !isLoading
);
}, [accumulatedSessions.length, totalCount, isFetching, isLoading]);
const areAllSessionsLoaded =
totalCount !== null &&
accumulatedSessions.length >= totalCount &&
!isFetching &&
!isLoading;
useEffect(() => {
if (
@@ -67,17 +66,17 @@ export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
}
}, [hasNextPage, isFetching, isLoading, isError, totalCount]);
function fetchNextPage() {
const fetchNextPage = () => {
if (hasNextPage && !isFetching) {
setOffset((prev) => prev + PAGE_SIZE);
}
}
};
function reset() {
const reset = () => {
setOffset(0);
setAccumulatedSessions([]);
setTotalCount(null);
}
};
return {
sessions: accumulatedSessions,

View File

@@ -2,9 +2,7 @@ import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessi
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { format, formatDistanceToNow, isToday } from "date-fns";
export function convertSessionDetailToSummary(
session: SessionDetailResponse,
): SessionSummaryResponse {
export function convertSessionDetailToSummary(session: SessionDetailResponse) {
return {
id: session.id,
created_at: session.created_at,
@@ -13,17 +11,25 @@ export function convertSessionDetailToSummary(
};
}
export function filterVisibleSessions(
sessions: SessionSummaryResponse[],
): SessionSummaryResponse[] {
return sessions.filter(
(session) => session.updated_at !== session.created_at,
);
export function filterVisibleSessions(sessions: SessionSummaryResponse[]) {
const fiveMinutesAgo = Date.now() - 5 * 60 * 1000;
return sessions.filter((session) => {
const hasBeenUpdated = session.updated_at !== session.created_at;
if (hasBeenUpdated) return true;
const isRecentlyCreated =
new Date(session.created_at).getTime() > fiveMinutesAgo;
return isRecentlyCreated;
});
}
export function getSessionTitle(session: SessionSummaryResponse): string {
export function getSessionTitle(session: SessionSummaryResponse) {
if (session.title) return session.title;
const isNewSession = session.updated_at === session.created_at;
if (isNewSession) {
const createdDate = new Date(session.created_at);
if (isToday(createdDate)) {
@@ -31,12 +37,11 @@ export function getSessionTitle(session: SessionSummaryResponse): string {
}
return format(createdDate, "MMM d, yyyy");
}
return "Untitled Chat";
}
export function getSessionUpdatedLabel(
session: SessionSummaryResponse,
): string {
export function getSessionUpdatedLabel(session: SessionSummaryResponse) {
if (!session.updated_at) return "";
return formatDistanceToNow(new Date(session.updated_at), { addSuffix: true });
}
@@ -45,8 +50,10 @@ export function mergeCurrentSessionIntoList(
accumulatedSessions: SessionSummaryResponse[],
currentSessionId: string | null,
currentSessionData: SessionDetailResponse | null | undefined,
): SessionSummaryResponse[] {
recentlyCreatedSessions?: Map<string, SessionSummaryResponse>,
) {
const filteredSessions: SessionSummaryResponse[] = [];
const addedIds = new Set<string>();
if (accumulatedSessions.length > 0) {
const visibleSessions = filterVisibleSessions(accumulatedSessions);
@@ -61,105 +68,39 @@ export function mergeCurrentSessionIntoList(
);
if (!isInVisible) {
filteredSessions.push(currentInAll);
addedIds.add(currentInAll.id);
}
}
}
filteredSessions.push(...visibleSessions);
for (const session of visibleSessions) {
if (!addedIds.has(session.id)) {
filteredSessions.push(session);
addedIds.add(session.id);
}
}
}
if (currentSessionId && currentSessionData) {
const isCurrentInList = filteredSessions.some(
(s) => s.id === currentSessionId,
);
if (!isCurrentInList) {
if (!addedIds.has(currentSessionId)) {
const summarySession = convertSessionDetailToSummary(currentSessionData);
filteredSessions.unshift(summarySession);
addedIds.add(currentSessionId);
}
}
if (recentlyCreatedSessions) {
for (const [sessionId, sessionData] of recentlyCreatedSessions) {
if (!addedIds.has(sessionId)) {
filteredSessions.unshift(sessionData);
addedIds.add(sessionId);
}
}
}
return filteredSessions;
}
export function getCurrentSessionId(
searchParams: URLSearchParams,
): string | null {
export function getCurrentSessionId(searchParams: URLSearchParams) {
return searchParams.get("sessionId");
}
export function shouldAutoSelectSession(
areAllSessionsLoaded: boolean,
hasAutoSelectedSession: boolean,
paramSessionId: string | null,
visibleSessions: SessionSummaryResponse[],
accumulatedSessions: SessionSummaryResponse[],
isLoading: boolean,
totalCount: number | null,
): {
shouldSelect: boolean;
sessionIdToSelect: string | null;
shouldCreate: boolean;
} {
if (!areAllSessionsLoaded || hasAutoSelectedSession) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
if (paramSessionId) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
if (visibleSessions.length > 0) {
return {
shouldSelect: true,
sessionIdToSelect: visibleSessions[0].id,
shouldCreate: false,
};
}
if (accumulatedSessions.length === 0 && !isLoading && totalCount === 0) {
return { shouldSelect: false, sessionIdToSelect: null, shouldCreate: true };
}
if (totalCount === 0) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
return { shouldSelect: false, sessionIdToSelect: null, shouldCreate: false };
}
export function checkReadyToShowContent(
areAllSessionsLoaded: boolean,
paramSessionId: string | null,
accumulatedSessions: SessionSummaryResponse[],
isCurrentSessionLoading: boolean,
currentSessionData: SessionDetailResponse | null | undefined,
hasAutoSelectedSession: boolean,
): boolean {
if (!areAllSessionsLoaded) return false;
if (paramSessionId) {
const sessionFound = accumulatedSessions.some(
(s) => s.id === paramSessionId,
);
return (
sessionFound ||
(!isCurrentSessionLoading &&
currentSessionData !== undefined &&
currentSessionData !== null)
);
}
return hasAutoSelectedSession;
}

View File

@@ -1,26 +1,24 @@
"use client";
import {
getGetV2GetSessionQueryKey,
getGetV2ListSessionsQueryKey,
useGetV2GetSession,
} from "@/app/api/__generated__/endpoints/chat/chat";
import { okData } from "@/app/api/helpers";
import { useChatStore } from "@/components/contextual/Chat/chat-store";
import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useQueryClient } from "@tanstack/react-query";
import { usePathname, useRouter, useSearchParams } from "next/navigation";
import { useEffect, useRef, useState } from "react";
import { usePathname, useSearchParams } from "next/navigation";
import { useRef } from "react";
import { useCopilotStore } from "../../copilot-page-store";
import { useCopilotSessionId } from "../../useCopilotSessionId";
import { useMobileDrawer } from "./components/MobileDrawer/useMobileDrawer";
import { useSessionsPagination } from "./components/SessionsList/useSessionsPagination";
import {
checkReadyToShowContent,
filterVisibleSessions,
getCurrentSessionId,
mergeCurrentSessionIntoList,
} from "./helpers";
import { getCurrentSessionId } from "./helpers";
import { useShellSessionList } from "./useShellSessionList";
export function useCopilotShell() {
const router = useRouter();
const pathname = usePathname();
const searchParams = useSearchParams();
const queryClient = useQueryClient();
@@ -29,6 +27,8 @@ export function useCopilotShell() {
const isMobile =
breakpoint === "base" || breakpoint === "sm" || breakpoint === "md";
const { urlSessionId, setUrlSessionId } = useCopilotSessionId();
const isOnHomepage = pathname === "/copilot";
const paramSessionId = searchParams.get("sessionId");
@@ -41,114 +41,113 @@ export function useCopilotShell() {
const paginationEnabled = !isMobile || isDrawerOpen || !!paramSessionId;
const {
sessions: accumulatedSessions,
isLoading: isSessionsLoading,
isFetching: isSessionsFetching,
hasNextPage,
areAllSessionsLoaded,
fetchNextPage,
reset: resetPagination,
} = useSessionsPagination({
enabled: paginationEnabled,
});
const currentSessionId = getCurrentSessionId(searchParams);
const { data: currentSessionData, isLoading: isCurrentSessionLoading } =
useGetV2GetSession(currentSessionId || "", {
const { data: currentSessionData } = useGetV2GetSession(
currentSessionId || "",
{
query: {
enabled: !!currentSessionId,
select: okData,
},
});
const [hasAutoSelectedSession, setHasAutoSelectedSession] = useState(false);
const hasAutoSelectedRef = useRef(false);
// Mark as auto-selected when sessionId is in URL
useEffect(() => {
if (paramSessionId && !hasAutoSelectedRef.current) {
hasAutoSelectedRef.current = true;
setHasAutoSelectedSession(true);
}
}, [paramSessionId]);
// On homepage without sessionId, mark as ready immediately
useEffect(() => {
if (isOnHomepage && !paramSessionId && !hasAutoSelectedRef.current) {
hasAutoSelectedRef.current = true;
setHasAutoSelectedSession(true);
}
}, [isOnHomepage, paramSessionId]);
// Invalidate sessions list when navigating to homepage (to show newly created sessions)
useEffect(() => {
if (isOnHomepage && !paramSessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
}
}, [isOnHomepage, paramSessionId, queryClient]);
// Reset pagination when query becomes disabled
const prevPaginationEnabledRef = useRef(paginationEnabled);
useEffect(() => {
if (prevPaginationEnabledRef.current && !paginationEnabled) {
resetPagination();
resetAutoSelect();
}
prevPaginationEnabledRef.current = paginationEnabled;
}, [paginationEnabled, resetPagination]);
const sessions = mergeCurrentSessionIntoList(
accumulatedSessions,
currentSessionId,
currentSessionData,
},
);
const visibleSessions = filterVisibleSessions(sessions);
const {
sessions,
isLoading,
isSessionsFetching,
hasNextPage,
fetchNextPage,
resetPagination,
recentlyCreatedSessionsRef,
} = useShellSessionList({
paginationEnabled,
currentSessionId,
currentSessionData,
isOnHomepage,
paramSessionId,
});
const sidebarSelectedSessionId =
isOnHomepage && !paramSessionId ? null : currentSessionId;
const stopStream = useChatStore((s) => s.stopStream);
const onStreamComplete = useChatStore((s) => s.onStreamComplete);
const isStreaming = useCopilotStore((s) => s.isStreaming);
const isCreatingSession = useCopilotStore((s) => s.isCreatingSession);
const setIsSwitchingSession = useCopilotStore((s) => s.setIsSwitchingSession);
const openInterruptModal = useCopilotStore((s) => s.openInterruptModal);
const isReadyToShowContent = isOnHomepage
? true
: checkReadyToShowContent(
areAllSessionsLoaded,
paramSessionId,
accumulatedSessions,
isCurrentSessionLoading,
currentSessionData,
hasAutoSelectedSession,
);
const pendingActionRef = useRef<(() => void) | null>(null);
function handleSelectSession(sessionId: string) {
// Navigate using replaceState to avoid full page reload
window.history.replaceState(null, "", `/copilot?sessionId=${sessionId}`);
// Force a re-render by updating the URL through router
router.replace(`/copilot?sessionId=${sessionId}`);
async function stopCurrentStream() {
if (!currentSessionId) return;
setIsSwitchingSession(true);
await new Promise<void>((resolve) => {
const unsubscribe = onStreamComplete((completedId) => {
if (completedId === currentSessionId) {
clearTimeout(timeout);
unsubscribe();
resolve();
}
});
const timeout = setTimeout(() => {
unsubscribe();
resolve();
}, 3000);
stopStream(currentSessionId);
});
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(currentSessionId),
});
setIsSwitchingSession(false);
}
function selectSession(sessionId: string) {
if (sessionId === currentSessionId) return;
if (recentlyCreatedSessionsRef.current.has(sessionId)) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
});
}
setUrlSessionId(sessionId, { shallow: false });
if (isMobile) handleCloseDrawer();
}
function handleNewChat() {
resetAutoSelect();
function startNewChat() {
resetPagination();
// Invalidate and refetch sessions list to ensure newly created sessions appear
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
window.history.replaceState(null, "", "/copilot");
router.replace("/copilot");
setUrlSessionId(null, { shallow: false });
if (isMobile) handleCloseDrawer();
}
function resetAutoSelect() {
hasAutoSelectedRef.current = false;
setHasAutoSelectedSession(false);
function handleSessionClick(sessionId: string) {
if (sessionId === currentSessionId) return;
if (isStreaming) {
pendingActionRef.current = async () => {
await stopCurrentStream();
selectSession(sessionId);
};
openInterruptModal(pendingActionRef.current);
} else {
selectSession(sessionId);
}
}
const isLoading = isSessionsLoading && accumulatedSessions.length === 0;
function handleNewChatClick() {
if (isStreaming) {
pendingActionRef.current = async () => {
await stopCurrentStream();
startNewChat();
};
openInterruptModal(pendingActionRef.current);
} else {
startNewChat();
}
}
return {
isMobile,
@@ -156,17 +155,17 @@ export function useCopilotShell() {
isLoggedIn,
hasActiveSession:
Boolean(currentSessionId) && (!isOnHomepage || Boolean(paramSessionId)),
isLoading,
sessions: visibleSessions,
currentSessionId: sidebarSelectedSessionId,
handleSelectSession,
isLoading: isLoading || isCreatingSession,
isCreatingSession,
sessions,
currentSessionId: urlSessionId,
handleOpenDrawer,
handleCloseDrawer,
handleDrawerOpenChange,
handleNewChat,
handleNewChatClick,
handleSessionClick,
hasNextPage,
isFetchingNextPage: isSessionsFetching,
fetchNextPage,
isReadyToShowContent,
};
}

View File

@@ -0,0 +1,113 @@
import { getGetV2ListSessionsQueryKey } from "@/app/api/__generated__/endpoints/chat/chat";
import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse";
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { useChatStore } from "@/components/contextual/Chat/chat-store";
import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useMemo, useRef } from "react";
import { useSessionsPagination } from "./components/SessionsList/useSessionsPagination";
import {
convertSessionDetailToSummary,
filterVisibleSessions,
mergeCurrentSessionIntoList,
} from "./helpers";
interface UseShellSessionListArgs {
paginationEnabled: boolean;
currentSessionId: string | null;
currentSessionData: SessionDetailResponse | null | undefined;
isOnHomepage: boolean;
paramSessionId: string | null;
}
export function useShellSessionList({
paginationEnabled,
currentSessionId,
currentSessionData,
isOnHomepage,
paramSessionId,
}: UseShellSessionListArgs) {
const queryClient = useQueryClient();
const onStreamComplete = useChatStore((s) => s.onStreamComplete);
const {
sessions: accumulatedSessions,
isLoading: isSessionsLoading,
isFetching: isSessionsFetching,
hasNextPage,
fetchNextPage,
reset: resetPagination,
} = useSessionsPagination({
enabled: paginationEnabled,
});
const recentlyCreatedSessionsRef = useRef<
Map<string, SessionSummaryResponse>
>(new Map());
useEffect(() => {
if (isOnHomepage && !paramSessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
}
}, [isOnHomepage, paramSessionId, queryClient]);
useEffect(() => {
if (currentSessionId && currentSessionData) {
const isNewSession =
currentSessionData.updated_at === currentSessionData.created_at;
const isNotInAccumulated = !accumulatedSessions.some(
(s) => s.id === currentSessionId,
);
if (isNewSession || isNotInAccumulated) {
const summary = convertSessionDetailToSummary(currentSessionData);
recentlyCreatedSessionsRef.current.set(currentSessionId, summary);
}
}
}, [currentSessionId, currentSessionData, accumulatedSessions]);
useEffect(() => {
for (const sessionId of recentlyCreatedSessionsRef.current.keys()) {
if (accumulatedSessions.some((s) => s.id === sessionId)) {
recentlyCreatedSessionsRef.current.delete(sessionId);
}
}
}, [accumulatedSessions]);
useEffect(() => {
const unsubscribe = onStreamComplete(() => {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
});
return unsubscribe;
}, [onStreamComplete, queryClient]);
const sessions = useMemo(
() =>
mergeCurrentSessionIntoList(
accumulatedSessions,
currentSessionId,
currentSessionData,
recentlyCreatedSessionsRef.current,
),
[accumulatedSessions, currentSessionId, currentSessionData],
);
const visibleSessions = useMemo(
() => filterVisibleSessions(sessions),
[sessions],
);
const isLoading = isSessionsLoading && accumulatedSessions.length === 0;
return {
sessions: visibleSessions,
isLoading,
isSessionsFetching,
hasNextPage,
fetchNextPage,
resetPagination,
recentlyCreatedSessionsRef,
};
}

View File

@@ -0,0 +1,56 @@
"use client";
import { create } from "zustand";
interface CopilotStoreState {
isStreaming: boolean;
isSwitchingSession: boolean;
isCreatingSession: boolean;
isInterruptModalOpen: boolean;
pendingAction: (() => void) | null;
}
interface CopilotStoreActions {
setIsStreaming: (isStreaming: boolean) => void;
setIsSwitchingSession: (isSwitchingSession: boolean) => void;
setIsCreatingSession: (isCreating: boolean) => void;
openInterruptModal: (onConfirm: () => void) => void;
confirmInterrupt: () => void;
cancelInterrupt: () => void;
}
type CopilotStore = CopilotStoreState & CopilotStoreActions;
export const useCopilotStore = create<CopilotStore>((set, get) => ({
isStreaming: false,
isSwitchingSession: false,
isCreatingSession: false,
isInterruptModalOpen: false,
pendingAction: null,
setIsStreaming(isStreaming) {
set({ isStreaming });
},
setIsSwitchingSession(isSwitchingSession) {
set({ isSwitchingSession });
},
setIsCreatingSession(isCreatingSession) {
set({ isCreatingSession });
},
openInterruptModal(onConfirm) {
set({ isInterruptModalOpen: true, pendingAction: onConfirm });
},
confirmInterrupt() {
const { pendingAction } = get();
set({ isInterruptModalOpen: false, pendingAction: null });
if (pendingAction) pendingAction();
},
cancelInterrupt() {
set({ isInterruptModalOpen: false, pendingAction: null });
},
}));

View File

@@ -1,28 +1,5 @@
import type { User } from "@supabase/supabase-js";
export type PageState =
| { type: "welcome" }
| { type: "newChat" }
| { type: "creating"; prompt: string }
| { type: "chat"; sessionId: string; initialPrompt?: string };
export function getInitialPromptFromState(
pageState: PageState,
storedInitialPrompt: string | undefined,
) {
if (storedInitialPrompt) return storedInitialPrompt;
if (pageState.type === "creating") return pageState.prompt;
if (pageState.type === "chat") return pageState.initialPrompt;
}
export function shouldResetToWelcome(pageState: PageState) {
return (
pageState.type !== "newChat" &&
pageState.type !== "creating" &&
pageState.type !== "welcome"
);
}
export function getGreetingName(user?: User | null): string {
if (!user) return "there";
const metadata = user.user_metadata as Record<string, unknown> | undefined;

View File

@@ -1,11 +1,6 @@
import type { ReactNode } from "react";
import { NewChatProvider } from "./NewChatContext";
import { CopilotShell } from "./components/CopilotShell/CopilotShell";
export default function CopilotLayout({ children }: { children: ReactNode }) {
return (
<NewChatProvider>
<CopilotShell>{children}</CopilotShell>
</NewChatProvider>
);
return <CopilotShell>{children}</CopilotShell>;
}

View File

@@ -1,22 +1,25 @@
"use client";
import { Skeleton } from "@/components/__legacy__/ui/skeleton";
import { Button } from "@/components/atoms/Button/Button";
import { Skeleton } from "@/components/atoms/Skeleton/Skeleton";
import { Text } from "@/components/atoms/Text/Text";
import { Chat } from "@/components/contextual/Chat/Chat";
import { ChatInput } from "@/components/contextual/Chat/components/ChatInput/ChatInput";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useCopilotStore } from "./copilot-page-store";
import { useCopilotPage } from "./useCopilotPage";
export default function CopilotPage() {
const { state, handlers } = useCopilotPage();
const isInterruptModalOpen = useCopilotStore((s) => s.isInterruptModalOpen);
const confirmInterrupt = useCopilotStore((s) => s.confirmInterrupt);
const cancelInterrupt = useCopilotStore((s) => s.cancelInterrupt);
const {
greetingName,
quickActions,
isLoading,
pageState,
isNewChatModalOpen,
hasSession,
initialPrompt,
isReady,
} = state;
const {
@@ -24,24 +27,16 @@ export default function CopilotPage() {
startChatWithPrompt,
handleSessionNotFound,
handleStreamingChange,
handleCancelNewChat,
proceedWithNewChat,
handleNewChatModalOpen,
} = handlers;
if (!isReady) {
return null;
}
if (!isReady) return null;
// Show Chat when we have an active session
if (pageState.type === "chat") {
if (hasSession) {
return (
<div className="flex h-full flex-col">
<Chat
key={pageState.sessionId ?? "welcome"}
className="flex-1"
urlSessionId={pageState.sessionId}
initialPrompt={pageState.initialPrompt}
initialPrompt={initialPrompt}
onSessionNotFound={handleSessionNotFound}
onStreamingChange={handleStreamingChange}
/>
@@ -49,31 +44,33 @@ export default function CopilotPage() {
title="Interrupt current chat?"
styling={{ maxWidth: 300, width: "100%" }}
controlled={{
isOpen: isNewChatModalOpen,
set: handleNewChatModalOpen,
isOpen: isInterruptModalOpen,
set: (open) => {
if (!open) cancelInterrupt();
},
}}
onClose={handleCancelNewChat}
onClose={cancelInterrupt}
>
<Dialog.Content>
<div className="flex flex-col gap-4">
<Text variant="body">
The current chat response will be interrupted. Are you sure you
want to start a new chat?
want to continue?
</Text>
<Dialog.Footer>
<Button
type="button"
variant="outline"
onClick={handleCancelNewChat}
onClick={cancelInterrupt}
>
Cancel
</Button>
<Button
type="button"
variant="primary"
onClick={proceedWithNewChat}
onClick={confirmInterrupt}
>
Start new chat
Continue
</Button>
</Dialog.Footer>
</div>
@@ -83,34 +80,6 @@ export default function CopilotPage() {
);
}
if (pageState.type === "newChat") {
return (
<div className="flex h-full flex-1 flex-col items-center justify-center bg-[#f8f8f9]">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Loading your chats...
</Text>
</div>
</div>
);
}
// Show loading state while creating session and sending first message
if (pageState.type === "creating") {
return (
<div className="flex h-full flex-1 flex-col items-center justify-center bg-[#f8f8f9]">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Loading your chats...
</Text>
</div>
</div>
);
}
// Show Welcome screen
return (
<div className="flex h-full flex-1 items-center justify-center overflow-y-auto bg-[#f8f8f9] px-6 py-10">
<div className="w-full text-center">

View File

@@ -1,4 +1,7 @@
import { postV2CreateSession } from "@/app/api/__generated__/endpoints/chat/chat";
import {
getGetV2ListSessionsQueryKey,
postV2CreateSession,
} from "@/app/api/__generated__/endpoints/chat/chat";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { getHomepageRoute } from "@/lib/constants";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
@@ -7,81 +10,27 @@ import {
type FlagValues,
useGetFlag,
} from "@/services/feature-flags/use-get-flag";
import { SessionKey, sessionStorage } from "@/services/storage/session-storage";
import * as Sentry from "@sentry/nextjs";
import { useQueryClient } from "@tanstack/react-query";
import { useFlags } from "launchdarkly-react-client-sdk";
import { useRouter } from "next/navigation";
import { useEffect, useReducer } from "react";
import { useNewChat } from "./NewChatContext";
import { getGreetingName, getQuickActions, type PageState } from "./helpers";
import { useCopilotURLState } from "./useCopilotURLState";
type CopilotState = {
pageState: PageState;
isStreaming: boolean;
isNewChatModalOpen: boolean;
initialPrompts: Record<string, string>;
previousSessionId: string | null;
};
type CopilotAction =
| { type: "setPageState"; pageState: PageState }
| { type: "setStreaming"; isStreaming: boolean }
| { type: "setNewChatModalOpen"; isOpen: boolean }
| { type: "setInitialPrompt"; sessionId: string; prompt: string }
| { type: "setPreviousSessionId"; sessionId: string | null };
function isSamePageState(next: PageState, current: PageState) {
if (next.type !== current.type) return false;
if (next.type === "creating" && current.type === "creating") {
return next.prompt === current.prompt;
}
if (next.type === "chat" && current.type === "chat") {
return (
next.sessionId === current.sessionId &&
next.initialPrompt === current.initialPrompt
);
}
return true;
}
function copilotReducer(
state: CopilotState,
action: CopilotAction,
): CopilotState {
if (action.type === "setPageState") {
if (isSamePageState(action.pageState, state.pageState)) return state;
return { ...state, pageState: action.pageState };
}
if (action.type === "setStreaming") {
if (action.isStreaming === state.isStreaming) return state;
return { ...state, isStreaming: action.isStreaming };
}
if (action.type === "setNewChatModalOpen") {
if (action.isOpen === state.isNewChatModalOpen) return state;
return { ...state, isNewChatModalOpen: action.isOpen };
}
if (action.type === "setInitialPrompt") {
if (state.initialPrompts[action.sessionId] === action.prompt) return state;
return {
...state,
initialPrompts: {
...state.initialPrompts,
[action.sessionId]: action.prompt,
},
};
}
if (action.type === "setPreviousSessionId") {
if (state.previousSessionId === action.sessionId) return state;
return { ...state, previousSessionId: action.sessionId };
}
return state;
}
import { useEffect } from "react";
import { useCopilotStore } from "./copilot-page-store";
import { getGreetingName, getQuickActions } from "./helpers";
import { useCopilotSessionId } from "./useCopilotSessionId";
export function useCopilotPage() {
const router = useRouter();
const queryClient = useQueryClient();
const { user, isLoggedIn, isUserLoading } = useSupabase();
const { toast } = useToast();
const { urlSessionId, setUrlSessionId } = useCopilotSessionId();
const setIsStreaming = useCopilotStore((s) => s.setIsStreaming);
const isCreating = useCopilotStore((s) => s.isCreatingSession);
const setIsCreating = useCopilotStore((s) => s.setIsCreatingSession);
const isChatEnabled = useGetFlag(Flag.CHAT);
const flags = useFlags<FlagValues>();
const homepageRoute = getHomepageRoute(isChatEnabled);
@@ -91,86 +40,27 @@ export function useCopilotPage() {
const isFlagReady =
!isLaunchDarklyConfigured || flags[Flag.CHAT] !== undefined;
const [state, dispatch] = useReducer(copilotReducer, {
pageState: { type: "welcome" },
isStreaming: false,
isNewChatModalOpen: false,
initialPrompts: {},
previousSessionId: null,
});
const newChatContext = useNewChat();
const greetingName = getGreetingName(user);
const quickActions = getQuickActions();
function setPageState(pageState: PageState) {
dispatch({ type: "setPageState", pageState });
}
const hasSession = Boolean(urlSessionId);
const initialPrompt = urlSessionId
? getInitialPrompt(urlSessionId)
: undefined;
function setInitialPrompt(sessionId: string, prompt: string) {
dispatch({ type: "setInitialPrompt", sessionId, prompt });
}
function setPreviousSessionId(sessionId: string | null) {
dispatch({ type: "setPreviousSessionId", sessionId });
}
const { setUrlSessionId } = useCopilotURLState({
pageState: state.pageState,
initialPrompts: state.initialPrompts,
previousSessionId: state.previousSessionId,
setPageState,
setInitialPrompt,
setPreviousSessionId,
});
useEffect(
function registerNewChatHandler() {
if (!newChatContext) return;
newChatContext.setOnNewChatClick(handleNewChatClick);
return function cleanup() {
newChatContext.setOnNewChatClick(undefined);
};
},
[newChatContext, handleNewChatClick],
);
useEffect(
function transitionNewChatToWelcome() {
if (state.pageState.type === "newChat") {
function setWelcomeState() {
dispatch({ type: "setPageState", pageState: { type: "welcome" } });
}
const timer = setTimeout(setWelcomeState, 300);
return function cleanup() {
clearTimeout(timer);
};
}
},
[state.pageState.type],
);
useEffect(
function ensureAccess() {
if (!isFlagReady) return;
if (isChatEnabled === false) {
router.replace(homepageRoute);
}
},
[homepageRoute, isChatEnabled, isFlagReady, router],
);
useEffect(() => {
if (!isFlagReady) return;
if (isChatEnabled === false) {
router.replace(homepageRoute);
}
}, [homepageRoute, isChatEnabled, isFlagReady, router]);
async function startChatWithPrompt(prompt: string) {
if (!prompt?.trim()) return;
if (state.pageState.type === "creating") return;
if (isCreating) return;
const trimmedPrompt = prompt.trim();
dispatch({
type: "setPageState",
pageState: { type: "creating", prompt: trimmedPrompt },
});
setIsCreating(true);
try {
const sessionResponse = await postV2CreateSession({
@@ -182,23 +72,19 @@ export function useCopilotPage() {
}
const sessionId = sessionResponse.data.id;
setInitialPrompt(sessionId, trimmedPrompt);
dispatch({
type: "setInitialPrompt",
sessionId,
prompt: trimmedPrompt,
await queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
await setUrlSessionId(sessionId, { shallow: false });
dispatch({
type: "setPageState",
pageState: { type: "chat", sessionId, initialPrompt: trimmedPrompt },
});
await setUrlSessionId(sessionId, { shallow: true });
} catch (error) {
console.error("[CopilotPage] Failed to start chat:", error);
toast({ title: "Failed to start chat", variant: "destructive" });
Sentry.captureException(error);
dispatch({ type: "setPageState", pageState: { type: "welcome" } });
} finally {
setIsCreating(false);
}
}
@@ -211,37 +97,7 @@ export function useCopilotPage() {
}
function handleStreamingChange(isStreamingValue: boolean) {
dispatch({ type: "setStreaming", isStreaming: isStreamingValue });
}
async function proceedWithNewChat() {
dispatch({ type: "setNewChatModalOpen", isOpen: false });
if (newChatContext?.performNewChat) {
newChatContext.performNewChat();
return;
}
try {
await setUrlSessionId(null, { shallow: false });
} catch (error) {
console.error("[CopilotPage] Failed to clear session:", error);
}
router.replace("/copilot");
}
function handleCancelNewChat() {
dispatch({ type: "setNewChatModalOpen", isOpen: false });
}
function handleNewChatModalOpen(isOpen: boolean) {
dispatch({ type: "setNewChatModalOpen", isOpen });
}
function handleNewChatClick() {
if (state.isStreaming) {
dispatch({ type: "setNewChatModalOpen", isOpen: true });
} else {
proceedWithNewChat();
}
setIsStreaming(isStreamingValue);
}
return {
@@ -249,8 +105,8 @@ export function useCopilotPage() {
greetingName,
quickActions,
isLoading: isUserLoading,
pageState: state.pageState,
isNewChatModalOpen: state.isNewChatModalOpen,
hasSession,
initialPrompt,
isReady: isFlagReady && isChatEnabled !== false && isLoggedIn,
},
handlers: {
@@ -258,9 +114,32 @@ export function useCopilotPage() {
startChatWithPrompt,
handleSessionNotFound,
handleStreamingChange,
handleCancelNewChat,
proceedWithNewChat,
handleNewChatModalOpen,
},
};
}
function getInitialPrompt(sessionId: string): string | undefined {
try {
const prompts = JSON.parse(
sessionStorage.get(SessionKey.CHAT_INITIAL_PROMPTS) || "{}",
);
return prompts[sessionId];
} catch {
return undefined;
}
}
function setInitialPrompt(sessionId: string, prompt: string): void {
try {
const prompts = JSON.parse(
sessionStorage.get(SessionKey.CHAT_INITIAL_PROMPTS) || "{}",
);
prompts[sessionId] = prompt;
sessionStorage.set(
SessionKey.CHAT_INITIAL_PROMPTS,
JSON.stringify(prompts),
);
} catch {
// Ignore storage errors
}
}

View File

@@ -0,0 +1,10 @@
import { parseAsString, useQueryState } from "nuqs";
export function useCopilotSessionId() {
const [urlSessionId, setUrlSessionId] = useQueryState(
"sessionId",
parseAsString,
);
return { urlSessionId, setUrlSessionId };
}

View File

@@ -1,80 +0,0 @@
import { parseAsString, useQueryState } from "nuqs";
import { useLayoutEffect } from "react";
import {
getInitialPromptFromState,
type PageState,
shouldResetToWelcome,
} from "./helpers";
interface UseCopilotUrlStateArgs {
pageState: PageState;
initialPrompts: Record<string, string>;
previousSessionId: string | null;
setPageState: (pageState: PageState) => void;
setInitialPrompt: (sessionId: string, prompt: string) => void;
setPreviousSessionId: (sessionId: string | null) => void;
}
export function useCopilotURLState({
pageState,
initialPrompts,
previousSessionId,
setPageState,
setInitialPrompt,
setPreviousSessionId,
}: UseCopilotUrlStateArgs) {
const [urlSessionId, setUrlSessionId] = useQueryState(
"sessionId",
parseAsString,
);
function syncSessionFromUrl() {
if (urlSessionId) {
if (pageState.type === "chat" && pageState.sessionId === urlSessionId) {
setPreviousSessionId(urlSessionId);
return;
}
const storedInitialPrompt = initialPrompts[urlSessionId];
const currentInitialPrompt = getInitialPromptFromState(
pageState,
storedInitialPrompt,
);
if (currentInitialPrompt) {
setInitialPrompt(urlSessionId, currentInitialPrompt);
}
setPageState({
type: "chat",
sessionId: urlSessionId,
initialPrompt: currentInitialPrompt,
});
setPreviousSessionId(urlSessionId);
return;
}
const wasInChat = previousSessionId !== null && pageState.type === "chat";
setPreviousSessionId(null);
if (wasInChat) {
setPageState({ type: "newChat" });
return;
}
if (shouldResetToWelcome(pageState)) {
setPageState({ type: "welcome" });
}
}
useLayoutEffect(syncSessionFromUrl, [
urlSessionId,
pageState.type,
previousSessionId,
initialPrompts,
]);
return {
urlSessionId,
setUrlSessionId,
};
}

View File

@@ -1,10 +1,12 @@
import { Navbar } from "@/components/layout/Navbar/Navbar";
import { NetworkStatusMonitor } from "@/services/network-status/NetworkStatusMonitor";
import { ReactNode } from "react";
import { AdminImpersonationBanner } from "./admin/components/AdminImpersonationBanner";
export default function PlatformLayout({ children }: { children: ReactNode }) {
return (
<main className="flex h-screen w-full flex-col">
<NetworkStatusMonitor />
<Navbar />
<AdminImpersonationBanner />
<section className="flex-1">{children}</section>

View File

@@ -0,0 +1,14 @@
import { cn } from "@/lib/utils";
interface Props extends React.HTMLAttributes<HTMLDivElement> {
className?: string;
}
export function Skeleton({ className, ...props }: Props) {
return (
<div
className={cn("animate-pulse rounded-md bg-zinc-100", className)}
{...props}
/>
);
}

View File

@@ -1,4 +1,4 @@
import { Skeleton } from "@/components/__legacy__/ui/skeleton";
import { Skeleton } from "./Skeleton";
import type { Meta, StoryObj } from "@storybook/nextjs";
const meta: Meta<typeof Skeleton> = {

View File

@@ -1,16 +1,17 @@
"use client";
import { useCopilotSessionId } from "@/app/(platform)/copilot/useCopilotSessionId";
import { useCopilotStore } from "@/app/(platform)/copilot/copilot-page-store";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import { useEffect, useRef } from "react";
import { ChatContainer } from "./components/ChatContainer/ChatContainer";
import { ChatErrorState } from "./components/ChatErrorState/ChatErrorState";
import { ChatLoader } from "./components/ChatLoader/ChatLoader";
import { useChat } from "./useChat";
export interface ChatProps {
className?: string;
urlSessionId?: string | null;
initialPrompt?: string;
onSessionNotFound?: () => void;
onStreamingChange?: (isStreaming: boolean) => void;
@@ -18,12 +19,13 @@ export interface ChatProps {
export function Chat({
className,
urlSessionId,
initialPrompt,
onSessionNotFound,
onStreamingChange,
}: ChatProps) {
const { urlSessionId } = useCopilotSessionId();
const hasHandledNotFoundRef = useRef(false);
const isSwitchingSession = useCopilotStore((s) => s.isSwitchingSession);
const {
messages,
isLoading,
@@ -33,49 +35,59 @@ export function Chat({
sessionId,
createSession,
showLoader,
startPollingForOperation,
} = useChat({ urlSessionId });
useEffect(
function handleMissingSession() {
if (!onSessionNotFound) return;
if (!urlSessionId) return;
if (!isSessionNotFound || isLoading || isCreating) return;
if (hasHandledNotFoundRef.current) return;
hasHandledNotFoundRef.current = true;
onSessionNotFound();
},
[onSessionNotFound, urlSessionId, isSessionNotFound, isLoading, isCreating],
);
useEffect(() => {
if (!onSessionNotFound) return;
if (!urlSessionId) return;
if (!isSessionNotFound || isLoading || isCreating) return;
if (hasHandledNotFoundRef.current) return;
hasHandledNotFoundRef.current = true;
onSessionNotFound();
}, [
onSessionNotFound,
urlSessionId,
isSessionNotFound,
isLoading,
isCreating,
]);
const shouldShowLoader =
(showLoader && (isLoading || isCreating)) || isSwitchingSession;
return (
<div className={cn("flex h-full flex-col", className)}>
{/* Main Content */}
<main className="flex min-h-0 w-full flex-1 flex-col overflow-hidden bg-[#f8f8f9]">
{/* Loading State */}
{showLoader && (isLoading || isCreating) && (
{shouldShowLoader && (
<div className="flex flex-1 items-center justify-center">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<div className="flex flex-col items-center gap-3">
<LoadingSpinner size="large" className="text-neutral-400" />
<Text variant="body" className="text-zinc-500">
Loading your chats...
{isSwitchingSession
? "Switching chat..."
: "Loading your chat..."}
</Text>
</div>
</div>
)}
{/* Error State */}
{error && !isLoading && (
{error && !isLoading && !isSwitchingSession && (
<ChatErrorState error={error} onRetry={createSession} />
)}
{/* Session Content */}
{sessionId && !isLoading && !error && (
{sessionId && !isLoading && !error && !isSwitchingSession && (
<ChatContainer
sessionId={sessionId}
initialMessages={messages}
initialPrompt={initialPrompt}
className="flex-1"
onStreamingChange={onStreamingChange}
onOperationStarted={startPollingForOperation}
/>
)}
</main>

View File

@@ -0,0 +1,289 @@
"use client";
import { create } from "zustand";
import type {
ActiveStream,
StreamChunk,
StreamCompleteCallback,
StreamResult,
StreamStatus,
} from "./chat-types";
import { executeStream } from "./stream-executor";
const COMPLETED_STREAM_TTL = 5 * 60 * 1000; // 5 minutes
interface ChatStoreState {
activeStreams: Map<string, ActiveStream>;
completedStreams: Map<string, StreamResult>;
activeSessions: Set<string>;
streamCompleteCallbacks: Set<StreamCompleteCallback>;
}
interface ChatStoreActions {
startStream: (
sessionId: string,
message: string,
isUserMessage: boolean,
context?: { url: string; content: string },
onChunk?: (chunk: StreamChunk) => void,
) => Promise<void>;
stopStream: (sessionId: string) => void;
subscribeToStream: (
sessionId: string,
onChunk: (chunk: StreamChunk) => void,
skipReplay?: boolean,
) => () => void;
getStreamStatus: (sessionId: string) => StreamStatus;
getCompletedStream: (sessionId: string) => StreamResult | undefined;
clearCompletedStream: (sessionId: string) => void;
isStreaming: (sessionId: string) => boolean;
registerActiveSession: (sessionId: string) => void;
unregisterActiveSession: (sessionId: string) => void;
isSessionActive: (sessionId: string) => boolean;
onStreamComplete: (callback: StreamCompleteCallback) => () => void;
}
type ChatStore = ChatStoreState & ChatStoreActions;
function notifyStreamComplete(
callbacks: Set<StreamCompleteCallback>,
sessionId: string,
) {
for (const callback of callbacks) {
try {
callback(sessionId);
} catch (err) {
console.warn("[ChatStore] Stream complete callback error:", err);
}
}
}
function cleanupExpiredStreams(
completedStreams: Map<string, StreamResult>,
): Map<string, StreamResult> {
const now = Date.now();
const cleaned = new Map(completedStreams);
for (const [sessionId, result] of cleaned) {
if (now - result.completedAt > COMPLETED_STREAM_TTL) {
cleaned.delete(sessionId);
}
}
return cleaned;
}
export const useChatStore = create<ChatStore>((set, get) => ({
activeStreams: new Map(),
completedStreams: new Map(),
activeSessions: new Set(),
streamCompleteCallbacks: new Set(),
startStream: async function startStream(
sessionId,
message,
isUserMessage,
context,
onChunk,
) {
const state = get();
const newActiveStreams = new Map(state.activeStreams);
let newCompletedStreams = new Map(state.completedStreams);
const callbacks = state.streamCompleteCallbacks;
const existingStream = newActiveStreams.get(sessionId);
if (existingStream) {
existingStream.abortController.abort();
const normalizedStatus =
existingStream.status === "streaming"
? "completed"
: existingStream.status;
const result: StreamResult = {
sessionId,
status: normalizedStatus,
chunks: existingStream.chunks,
completedAt: Date.now(),
error: existingStream.error,
};
newCompletedStreams.set(sessionId, result);
newActiveStreams.delete(sessionId);
newCompletedStreams = cleanupExpiredStreams(newCompletedStreams);
if (normalizedStatus === "completed" || normalizedStatus === "error") {
notifyStreamComplete(callbacks, sessionId);
}
}
const abortController = new AbortController();
const initialCallbacks = new Set<(chunk: StreamChunk) => void>();
if (onChunk) initialCallbacks.add(onChunk);
const stream: ActiveStream = {
sessionId,
abortController,
status: "streaming",
startedAt: Date.now(),
chunks: [],
onChunkCallbacks: initialCallbacks,
};
newActiveStreams.set(sessionId, stream);
set({
activeStreams: newActiveStreams,
completedStreams: newCompletedStreams,
});
try {
await executeStream(stream, message, isUserMessage, context);
} finally {
if (onChunk) stream.onChunkCallbacks.delete(onChunk);
if (stream.status !== "streaming") {
const currentState = get();
const finalActiveStreams = new Map(currentState.activeStreams);
let finalCompletedStreams = new Map(currentState.completedStreams);
const storedStream = finalActiveStreams.get(sessionId);
if (storedStream === stream) {
const result: StreamResult = {
sessionId,
status: stream.status,
chunks: stream.chunks,
completedAt: Date.now(),
error: stream.error,
};
finalCompletedStreams.set(sessionId, result);
finalActiveStreams.delete(sessionId);
finalCompletedStreams = cleanupExpiredStreams(finalCompletedStreams);
set({
activeStreams: finalActiveStreams,
completedStreams: finalCompletedStreams,
});
if (stream.status === "completed" || stream.status === "error") {
notifyStreamComplete(
currentState.streamCompleteCallbacks,
sessionId,
);
}
}
}
}
},
stopStream: function stopStream(sessionId) {
const state = get();
const stream = state.activeStreams.get(sessionId);
if (!stream) return;
stream.abortController.abort();
stream.status = "completed";
const newActiveStreams = new Map(state.activeStreams);
let newCompletedStreams = new Map(state.completedStreams);
const result: StreamResult = {
sessionId,
status: stream.status,
chunks: stream.chunks,
completedAt: Date.now(),
error: stream.error,
};
newCompletedStreams.set(sessionId, result);
newActiveStreams.delete(sessionId);
newCompletedStreams = cleanupExpiredStreams(newCompletedStreams);
set({
activeStreams: newActiveStreams,
completedStreams: newCompletedStreams,
});
notifyStreamComplete(state.streamCompleteCallbacks, sessionId);
},
subscribeToStream: function subscribeToStream(
sessionId,
onChunk,
skipReplay = false,
) {
const state = get();
const stream = state.activeStreams.get(sessionId);
if (stream) {
if (!skipReplay) {
for (const chunk of stream.chunks) {
onChunk(chunk);
}
}
stream.onChunkCallbacks.add(onChunk);
return function unsubscribe() {
stream.onChunkCallbacks.delete(onChunk);
};
}
return function noop() {};
},
getStreamStatus: function getStreamStatus(sessionId) {
const { activeStreams, completedStreams } = get();
const active = activeStreams.get(sessionId);
if (active) return active.status;
const completed = completedStreams.get(sessionId);
if (completed) return completed.status;
return "idle";
},
getCompletedStream: function getCompletedStream(sessionId) {
return get().completedStreams.get(sessionId);
},
clearCompletedStream: function clearCompletedStream(sessionId) {
const state = get();
if (!state.completedStreams.has(sessionId)) return;
const newCompletedStreams = new Map(state.completedStreams);
newCompletedStreams.delete(sessionId);
set({ completedStreams: newCompletedStreams });
},
isStreaming: function isStreaming(sessionId) {
const stream = get().activeStreams.get(sessionId);
return stream?.status === "streaming";
},
registerActiveSession: function registerActiveSession(sessionId) {
const state = get();
if (state.activeSessions.has(sessionId)) return;
const newActiveSessions = new Set(state.activeSessions);
newActiveSessions.add(sessionId);
set({ activeSessions: newActiveSessions });
},
unregisterActiveSession: function unregisterActiveSession(sessionId) {
const state = get();
if (!state.activeSessions.has(sessionId)) return;
const newActiveSessions = new Set(state.activeSessions);
newActiveSessions.delete(sessionId);
set({ activeSessions: newActiveSessions });
},
isSessionActive: function isSessionActive(sessionId) {
return get().activeSessions.has(sessionId);
},
onStreamComplete: function onStreamComplete(callback) {
const state = get();
const newCallbacks = new Set(state.streamCompleteCallbacks);
newCallbacks.add(callback);
set({ streamCompleteCallbacks: newCallbacks });
return function unsubscribe() {
const currentState = get();
const cleanedCallbacks = new Set(currentState.streamCompleteCallbacks);
cleanedCallbacks.delete(callback);
set({ streamCompleteCallbacks: cleanedCallbacks });
};
},
}));

View File

@@ -0,0 +1,94 @@
import type { ToolArguments, ToolResult } from "@/types/chat";
export type StreamStatus = "idle" | "streaming" | "completed" | "error";
export interface StreamChunk {
type:
| "text_chunk"
| "text_ended"
| "tool_call"
| "tool_call_start"
| "tool_response"
| "login_needed"
| "need_login"
| "credentials_needed"
| "error"
| "usage"
| "stream_end";
timestamp?: string;
content?: string;
message?: string;
code?: string;
details?: Record<string, unknown>;
tool_id?: string;
tool_name?: string;
arguments?: ToolArguments;
result?: ToolResult;
success?: boolean;
idx?: number;
session_id?: string;
agent_info?: {
graph_id: string;
name: string;
trigger_type: string;
};
provider?: string;
provider_name?: string;
credential_type?: string;
scopes?: string[];
title?: string;
[key: string]: unknown;
}
export type VercelStreamChunk =
| { type: "start"; messageId: string }
| { type: "finish" }
| { type: "text-start"; id: string }
| { type: "text-delta"; id: string; delta: string }
| { type: "text-end"; id: string }
| { type: "tool-input-start"; toolCallId: string; toolName: string }
| {
type: "tool-input-available";
toolCallId: string;
toolName: string;
input: Record<string, unknown>;
}
| {
type: "tool-output-available";
toolCallId: string;
toolName?: string;
output: unknown;
success?: boolean;
}
| {
type: "usage";
promptTokens: number;
completionTokens: number;
totalTokens: number;
}
| {
type: "error";
errorText: string;
code?: string;
details?: Record<string, unknown>;
};
export interface ActiveStream {
sessionId: string;
abortController: AbortController;
status: StreamStatus;
startedAt: number;
chunks: StreamChunk[];
error?: Error;
onChunkCallbacks: Set<(chunk: StreamChunk) => void>;
}
export interface StreamResult {
sessionId: string;
status: StreamStatus;
chunks: StreamChunk[];
completedAt: number;
error?: Error;
}
export type StreamCompleteCallback = (sessionId: string) => void;

View File

@@ -4,6 +4,7 @@ import { Text } from "@/components/atoms/Text/Text";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { cn } from "@/lib/utils";
import { GlobeHemisphereEastIcon } from "@phosphor-icons/react";
import { useEffect } from "react";
import { ChatInput } from "../ChatInput/ChatInput";
import { MessageList } from "../MessageList/MessageList";
@@ -15,6 +16,7 @@ export interface ChatContainerProps {
initialPrompt?: string;
className?: string;
onStreamingChange?: (isStreaming: boolean) => void;
onOperationStarted?: () => void;
}
export function ChatContainer({
@@ -23,6 +25,7 @@ export function ChatContainer({
initialPrompt,
className,
onStreamingChange,
onOperationStarted,
}: ChatContainerProps) {
const {
messages,
@@ -37,6 +40,7 @@ export function ChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
});
useEffect(() => {
@@ -55,24 +59,37 @@ export function ChatContainer({
)}
>
<Dialog
title="Service unavailable"
title={
<div className="flex items-center gap-2">
<GlobeHemisphereEastIcon className="size-6" />
<Text
variant="body"
className="text-md font-poppins leading-none md:text-lg"
>
Service unavailable
</Text>
</div>
}
controlled={{
isOpen: isRegionBlockedModalOpen,
set: handleRegionModalOpenChange,
}}
onClose={handleRegionModalClose}
styling={{ maxWidth: 550, width: "100%", minWidth: "auto" }}
>
<Dialog.Content>
<div className="flex flex-col gap-4">
<div className="flex flex-col gap-8">
<Text variant="body">
This model is not available in your region. Please connect via VPN
and try again.
The Autogpt AI model is not available in your region or your
connection is blocking it. Please try again with a different
connection.
</Text>
<div className="flex justify-end">
<div className="flex justify-center">
<Button
type="button"
variant="primary"
onClick={handleRegionModalClose}
className="w-full"
>
Got it
</Button>

View File

@@ -1,5 +1,5 @@
import { toast } from "sonner";
import { StreamChunk } from "../../useChatStream";
import type { StreamChunk } from "../../chat-types";
import type { HandlerDependencies } from "./handlers";
import {
handleError,

View File

@@ -22,6 +22,7 @@ export interface HandlerDependencies {
setIsStreamingInitiated: Dispatch<SetStateAction<boolean>>;
setIsRegionBlockedModalOpen: Dispatch<SetStateAction<boolean>>;
sessionId: string;
onOperationStarted?: () => void;
}
export function isRegionBlockedError(chunk: StreamChunk): boolean {
@@ -48,6 +49,15 @@ export function handleTextEnded(
const completedText = deps.streamingChunksRef.current.join("");
if (completedText.trim()) {
deps.setMessages((prev) => {
// Check if this exact message already exists to prevent duplicates
const exists = prev.some(
(msg) =>
msg.type === "message" &&
msg.role === "assistant" &&
msg.content === completedText,
);
if (exists) return prev;
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
@@ -154,6 +164,11 @@ export function handleToolResponse(
}
return;
}
// Trigger polling when operation_started is received
if (responseMessage.type === "operation_started") {
deps.onOperationStarted?.();
}
deps.setMessages((prev) => {
const toolCallIndex = prev.findIndex(
(msg) => msg.type === "tool_call" && msg.toolId === chunk.tool_id,
@@ -203,13 +218,24 @@ export function handleStreamEnd(
]);
}
if (completedContent.trim()) {
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
content: completedContent,
timestamp: new Date(),
};
deps.setMessages((prev) => [...prev, assistantMessage]);
deps.setMessages((prev) => {
// Check if this exact message already exists to prevent duplicates
const exists = prev.some(
(msg) =>
msg.type === "message" &&
msg.role === "assistant" &&
msg.content === completedContent,
);
if (exists) return prev;
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
content: completedContent,
timestamp: new Date(),
};
return [...prev, assistantMessage];
});
}
deps.setStreamingChunks([]);
deps.streamingChunksRef.current = [];

View File

@@ -1,7 +1,118 @@
import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse";
import { SessionKey, sessionStorage } from "@/services/storage/session-storage";
import type { ToolResult } from "@/types/chat";
import type { ChatMessageData } from "../ChatMessage/useChatMessage";
export function processInitialMessages(
initialMessages: SessionDetailResponse["messages"],
): ChatMessageData[] {
const processedMessages: ChatMessageData[] = [];
const toolCallMap = new Map<string, string>();
for (const msg of initialMessages) {
if (!isValidMessage(msg)) {
console.warn("Invalid message structure from backend:", msg);
continue;
}
let content = String(msg.content || "");
const role = String(msg.role || "assistant").toLowerCase();
const toolCalls = msg.tool_calls;
const timestamp = msg.timestamp
? new Date(msg.timestamp as string)
: undefined;
if (role === "user") {
content = removePageContext(content);
if (!content.trim()) continue;
processedMessages.push({
type: "message",
role: "user",
content,
timestamp,
});
continue;
}
if (role === "assistant") {
content = content
.replace(/<thinking>[\s\S]*?<\/thinking>/gi, "")
.replace(/<internal_reasoning>[\s\S]*?<\/internal_reasoning>/gi, "")
.trim();
if (toolCalls && isToolCallArray(toolCalls) && toolCalls.length > 0) {
for (const toolCall of toolCalls) {
const toolName = toolCall.function.name;
const toolId = toolCall.id;
toolCallMap.set(toolId, toolName);
try {
const args = JSON.parse(toolCall.function.arguments || "{}");
processedMessages.push({
type: "tool_call",
toolId,
toolName,
arguments: args,
timestamp,
});
} catch (err) {
console.warn("Failed to parse tool call arguments:", err);
processedMessages.push({
type: "tool_call",
toolId,
toolName,
arguments: {},
timestamp,
});
}
}
if (content.trim()) {
processedMessages.push({
type: "message",
role: "assistant",
content,
timestamp,
});
}
} else if (content.trim()) {
processedMessages.push({
type: "message",
role: "assistant",
content,
timestamp,
});
}
continue;
}
if (role === "tool") {
const toolCallId = (msg.tool_call_id as string) || "";
const toolName = toolCallMap.get(toolCallId) || "unknown";
const toolResponse = parseToolResponse(
content,
toolCallId,
toolName,
timestamp,
);
if (toolResponse) {
processedMessages.push(toolResponse);
}
continue;
}
if (content.trim()) {
processedMessages.push({
type: "message",
role: role as "user" | "assistant" | "system",
content,
timestamp,
});
}
}
return processedMessages;
}
export function hasSentInitialPrompt(sessionId: string): boolean {
try {
const sent = JSON.parse(
@@ -193,6 +304,7 @@ export function parseToolResponse(
if (isAgentArray(agentsData)) {
return {
type: "agent_carousel",
toolId,
toolName: "agent_carousel",
agents: agentsData,
totalCount: parsedResult.total_count as number | undefined,
@@ -205,6 +317,7 @@ export function parseToolResponse(
if (responseType === "execution_started") {
return {
type: "execution_started",
toolId,
toolName: "execution_started",
executionId: (parsedResult.execution_id as string) || "",
agentName: (parsedResult.graph_name as string) || undefined,
@@ -230,6 +343,41 @@ export function parseToolResponse(
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_started") {
return {
type: "operation_started",
toolName: (parsedResult.tool_name as string) || toolName,
toolId,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation started. You can close this tab.",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_pending") {
return {
type: "operation_pending",
toolName: (parsedResult.tool_name as string) || toolName,
toolId,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_in_progress") {
return {
type: "operation_in_progress",
toolName: (parsedResult.tool_name as string) || toolName,
toolCallId: (parsedResult.tool_call_id as string) || toolId,
message:
(parsedResult.message as string) ||
"Operation already in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "need_login") {
return {
type: "login_needed",

View File

@@ -1,5 +1,6 @@
import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useEffect, useMemo, useRef, useState } from "react";
import { useChatStore } from "../../chat-store";
import { toast } from "sonner";
import { useChatStream } from "../../useChatStream";
import { usePageContext } from "../../usePageContext";
@@ -9,23 +10,44 @@ import {
createUserMessage,
filterAuthMessages,
hasSentInitialPrompt,
isToolCallArray,
isValidMessage,
markInitialPromptSent,
parseToolResponse,
removePageContext,
processInitialMessages,
} from "./helpers";
// Helper to generate deduplication key for a message
function getMessageKey(msg: ChatMessageData): string {
if (msg.type === "message") {
// Don't include timestamp - dedupe by role + content only
// This handles the case where local and server timestamps differ
// Server messages are authoritative, so duplicates from local state are filtered
return `msg:${msg.role}:${msg.content}`;
} else if (msg.type === "tool_call") {
return `toolcall:${msg.toolId}`;
} else if (msg.type === "tool_response") {
return `toolresponse:${(msg as any).toolId}`;
} else if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
return `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`;
} else {
return `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`;
}
}
interface Args {
sessionId: string | null;
initialMessages: SessionDetailResponse["messages"];
initialPrompt?: string;
onOperationStarted?: () => void;
}
export function useChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
}: Args) {
const [messages, setMessages] = useState<ChatMessageData[]>([]);
const [streamingChunks, setStreamingChunks] = useState<string[]>([]);
@@ -41,11 +63,18 @@ export function useChatContainer({
sendMessage: sendStreamMessage,
stopStreaming,
} = useChatStream();
const activeStreams = useChatStore((s) => s.activeStreams);
const subscribeToStream = useChatStore((s) => s.subscribeToStream);
const isStreaming = isStreamingInitiated || hasTextChunks;
useEffect(() => {
if (sessionId !== previousSessionIdRef.current) {
stopStreaming(previousSessionIdRef.current ?? undefined, true);
useEffect(
function handleSessionChange() {
if (sessionId === previousSessionIdRef.current) return;
const prevSession = previousSessionIdRef.current;
if (prevSession) {
stopStreaming(prevSession);
}
previousSessionIdRef.current = sessionId;
setMessages([]);
setStreamingChunks([]);
@@ -53,138 +82,11 @@ export function useChatContainer({
setHasTextChunks(false);
setIsStreamingInitiated(false);
hasResponseRef.current = false;
}
}, [sessionId, stopStreaming]);
const allMessages = useMemo(() => {
const processedInitialMessages: ChatMessageData[] = [];
const toolCallMap = new Map<string, string>();
if (!sessionId) return;
for (const msg of initialMessages) {
if (!isValidMessage(msg)) {
console.warn("Invalid message structure from backend:", msg);
continue;
}
let content = String(msg.content || "");
const role = String(msg.role || "assistant").toLowerCase();
const toolCalls = msg.tool_calls;
const timestamp = msg.timestamp
? new Date(msg.timestamp as string)
: undefined;
if (role === "user") {
content = removePageContext(content);
if (!content.trim()) continue;
processedInitialMessages.push({
type: "message",
role: "user",
content,
timestamp,
});
continue;
}
if (role === "assistant") {
content = content
.replace(/<thinking>[\s\S]*?<\/thinking>/gi, "")
.trim();
if (toolCalls && isToolCallArray(toolCalls) && toolCalls.length > 0) {
for (const toolCall of toolCalls) {
const toolName = toolCall.function.name;
const toolId = toolCall.id;
toolCallMap.set(toolId, toolName);
try {
const args = JSON.parse(toolCall.function.arguments || "{}");
processedInitialMessages.push({
type: "tool_call",
toolId,
toolName,
arguments: args,
timestamp,
});
} catch (err) {
console.warn("Failed to parse tool call arguments:", err);
processedInitialMessages.push({
type: "tool_call",
toolId,
toolName,
arguments: {},
timestamp,
});
}
}
if (content.trim()) {
processedInitialMessages.push({
type: "message",
role: "assistant",
content,
timestamp,
});
}
} else if (content.trim()) {
processedInitialMessages.push({
type: "message",
role: "assistant",
content,
timestamp,
});
}
continue;
}
if (role === "tool") {
const toolCallId = (msg.tool_call_id as string) || "";
const toolName = toolCallMap.get(toolCallId) || "unknown";
const toolResponse = parseToolResponse(
content,
toolCallId,
toolName,
timestamp,
);
if (toolResponse) {
processedInitialMessages.push(toolResponse);
}
continue;
}
if (content.trim()) {
processedInitialMessages.push({
type: "message",
role: role as "user" | "assistant" | "system",
content,
timestamp,
});
}
}
return [...processedInitialMessages, ...messages];
}, [initialMessages, messages]);
const sendMessage = useCallback(
async function sendMessage(
content: string,
isUserMessage: boolean = true,
context?: { url: string; content: string },
) {
if (!sessionId) {
console.error("[useChatContainer] Cannot send message: no session ID");
return;
}
setIsRegionBlockedModalOpen(false);
if (isUserMessage) {
const userMessage = createUserMessage(content);
setMessages((prev) => [...filterAuthMessages(prev), userMessage]);
} else {
setMessages((prev) => filterAuthMessages(prev));
}
setStreamingChunks([]);
streamingChunksRef.current = [];
setHasTextChunks(false);
setIsStreamingInitiated(true);
hasResponseRef.current = false;
const activeStream = activeStreams.get(sessionId);
if (!activeStream || activeStream.status !== "streaming") return;
const dispatcher = createStreamEventDispatcher({
setHasTextChunks,
@@ -195,44 +97,170 @@ export function useChatContainer({
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
try {
await sendStreamMessage(
sessionId,
content,
dispatcher,
isUserMessage,
context,
);
} catch (err) {
console.error("[useChatContainer] Failed to send message:", err);
setIsStreamingInitiated(false);
// Don't show error toast for AbortError (expected during cleanup)
if (err instanceof Error && err.name === "AbortError") return;
const errorMessage =
err instanceof Error ? err.message : "Failed to send message";
toast.error("Failed to send message", {
description: errorMessage,
});
}
setIsStreamingInitiated(true);
const skipReplay = initialMessages.length > 0;
return subscribeToStream(sessionId, dispatcher, skipReplay);
},
[sessionId, sendStreamMessage],
[
sessionId,
stopStreaming,
activeStreams,
subscribeToStream,
onOperationStarted,
],
);
const handleStopStreaming = useCallback(() => {
// Collect toolIds from completed tool results in initialMessages
// Used to filter out operation messages when their results arrive
const completedToolIds = useMemo(() => {
const processedInitial = processInitialMessages(initialMessages);
const ids = new Set<string>();
for (const msg of processedInitial) {
if (
msg.type === "tool_response" ||
msg.type === "agent_carousel" ||
msg.type === "execution_started"
) {
const toolId = (msg as any).toolId;
if (toolId) {
ids.add(toolId);
}
}
}
return ids;
}, [initialMessages]);
// Clean up local operation messages when their completed results arrive from polling
// This effect runs when completedToolIds changes (i.e., when polling brings new results)
useEffect(
function cleanupCompletedOperations() {
if (completedToolIds.size === 0) return;
setMessages((prev) => {
const filtered = prev.filter((msg) => {
if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
const toolId = (msg as any).toolId || (msg as any).toolCallId;
if (toolId && completedToolIds.has(toolId)) {
return false; // Remove - operation completed
}
}
return true;
});
// Only update state if something was actually filtered
return filtered.length === prev.length ? prev : filtered;
});
},
[completedToolIds],
);
// Combine initial messages from backend with local streaming messages,
// Server messages maintain correct order; only append truly new local messages
const allMessages = useMemo(() => {
const processedInitial = processInitialMessages(initialMessages);
// Build a set of keys from server messages for deduplication
const serverKeys = new Set<string>();
for (const msg of processedInitial) {
serverKeys.add(getMessageKey(msg));
}
// Filter local messages: remove duplicates and completed operation messages
const newLocalMessages = messages.filter((msg) => {
// Remove operation messages for completed tools
if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
const toolId = (msg as any).toolId || (msg as any).toolCallId;
if (toolId && completedToolIds.has(toolId)) {
return false;
}
}
// Remove messages that already exist in server data
const key = getMessageKey(msg);
return !serverKeys.has(key);
});
// Server messages first (correct order), then new local messages
return [...processedInitial, ...newLocalMessages];
}, [initialMessages, messages, completedToolIds]);
async function sendMessage(
content: string,
isUserMessage: boolean = true,
context?: { url: string; content: string },
) {
if (!sessionId) {
console.error("[useChatContainer] Cannot send message: no session ID");
return;
}
setIsRegionBlockedModalOpen(false);
if (isUserMessage) {
const userMessage = createUserMessage(content);
setMessages((prev) => [...filterAuthMessages(prev), userMessage]);
} else {
setMessages((prev) => filterAuthMessages(prev));
}
setStreamingChunks([]);
streamingChunksRef.current = [];
setHasTextChunks(false);
setIsStreamingInitiated(true);
hasResponseRef.current = false;
const dispatcher = createStreamEventDispatcher({
setHasTextChunks,
setStreamingChunks,
streamingChunksRef,
hasResponseRef,
setMessages,
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
try {
await sendStreamMessage(
sessionId,
content,
dispatcher,
isUserMessage,
context,
);
} catch (err) {
console.error("[useChatContainer] Failed to send message:", err);
setIsStreamingInitiated(false);
if (err instanceof Error && err.name === "AbortError") return;
const errorMessage =
err instanceof Error ? err.message : "Failed to send message";
toast.error("Failed to send message", {
description: errorMessage,
});
}
}
function handleStopStreaming() {
stopStreaming();
setStreamingChunks([]);
streamingChunksRef.current = [];
setHasTextChunks(false);
setIsStreamingInitiated(false);
}, [stopStreaming]);
}
const { capturePageContext } = usePageContext();
const sendMessageRef = useRef(sendMessage);
sendMessageRef.current = sendMessage;
// Send initial prompt if provided (for new sessions from homepage)
useEffect(
function handleInitialPrompt() {
if (!initialPrompt || !sessionId) return;
@@ -241,15 +269,9 @@ export function useChatContainer({
markInitialPromptSent(sessionId);
const context = capturePageContext();
sendMessage(initialPrompt, true, context);
sendMessageRef.current(initialPrompt, true, context);
},
[
initialPrompt,
sessionId,
initialMessages.length,
sendMessage,
capturePageContext,
],
[initialPrompt, sessionId, initialMessages.length, capturePageContext],
);
async function sendMessageWithContext(

View File

@@ -21,7 +21,7 @@ export function ChatInput({
className,
}: Props) {
const inputId = "chat-input";
const { value, setValue, handleKeyDown, handleSend, hasMultipleLines } =
const { value, handleKeyDown, handleSubmit, handleChange, hasMultipleLines } =
useChatInput({
onSend,
disabled: disabled || isStreaming,
@@ -29,15 +29,6 @@ export function ChatInput({
inputId,
});
function handleSubmit(e: React.FormEvent<HTMLFormElement>) {
e.preventDefault();
handleSend();
}
function handleChange(e: React.ChangeEvent<HTMLTextAreaElement>) {
setValue(e.target.value);
}
return (
<form onSubmit={handleSubmit} className={cn("relative flex-1", className)}>
<div className="relative">

View File

@@ -1,4 +1,10 @@
import { KeyboardEvent, useCallback, useEffect, useState } from "react";
import {
ChangeEvent,
FormEvent,
KeyboardEvent,
useEffect,
useState,
} from "react";
interface UseChatInputArgs {
onSend: (message: string) => void;
@@ -16,6 +22,23 @@ export function useChatInput({
const [value, setValue] = useState("");
const [hasMultipleLines, setHasMultipleLines] = useState(false);
useEffect(
function focusOnMount() {
const textarea = document.getElementById(inputId) as HTMLTextAreaElement;
if (textarea) textarea.focus();
},
[inputId],
);
useEffect(
function focusWhenEnabled() {
if (disabled) return;
const textarea = document.getElementById(inputId) as HTMLTextAreaElement;
if (textarea) textarea.focus();
},
[disabled, inputId],
);
useEffect(() => {
const textarea = document.getElementById(inputId) as HTMLTextAreaElement;
const wrapper = document.getElementById(
@@ -77,7 +100,7 @@ export function useChatInput({
}
}, [value, maxRows, inputId]);
const handleSend = useCallback(() => {
const handleSend = () => {
if (disabled || !value.trim()) return;
onSend(value.trim());
setValue("");
@@ -93,23 +116,31 @@ export function useChatInput({
wrapper.style.height = "";
wrapper.style.maxHeight = "";
}
}, [value, onSend, disabled, inputId]);
};
const handleKeyDown = useCallback(
(event: KeyboardEvent<HTMLTextAreaElement>) => {
if (event.key === "Enter" && !event.shiftKey) {
event.preventDefault();
handleSend();
}
},
[handleSend],
);
function handleKeyDown(event: KeyboardEvent<HTMLTextAreaElement>) {
if (event.key === "Enter" && !event.shiftKey) {
event.preventDefault();
handleSend();
}
}
function handleSubmit(e: FormEvent<HTMLFormElement>) {
e.preventDefault();
handleSend();
}
function handleChange(e: ChangeEvent<HTMLTextAreaElement>) {
setValue(e.target.value);
}
return {
value,
setValue,
handleKeyDown,
handleSend,
handleSubmit,
handleChange,
hasMultipleLines,
};
}

View File

@@ -16,6 +16,7 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget";
import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup";
import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget";
import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage";
import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget";
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage";
import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage";
@@ -71,6 +72,9 @@ export function ChatMessage({
isLoginNeeded,
isCredentialsNeeded,
isClarificationNeeded,
isOperationStarted,
isOperationPending,
isOperationInProgress,
} = useChatMessage(message);
const displayContent = getDisplayContent(message, isUser);
@@ -126,10 +130,6 @@ export function ChatMessage({
[displayContent, message],
);
function isLongResponse(content: string): boolean {
return content.split("\n").length > 5;
}
const handleTryAgain = useCallback(() => {
if (message.type !== "message" || !onSendMessage) return;
onSendMessage(message.content, message.role === "user");
@@ -294,6 +294,42 @@ export function ChatMessage({
);
}
// Render operation_started messages (long-running background operations)
if (isOperationStarted && message.type === "operation_started") {
return (
<PendingOperationWidget
status="started"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_pending messages (operations in progress when refreshing)
if (isOperationPending && message.type === "operation_pending") {
return (
<PendingOperationWidget
status="pending"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_in_progress messages (duplicate request while operation running)
if (isOperationInProgress && message.type === "operation_in_progress") {
return (
<PendingOperationWidget
status="in_progress"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render tool response messages (but skip agent_output if it's being rendered inside assistant message)
if (isToolResponse && message.type === "tool_response") {
return (
@@ -358,7 +394,7 @@ export function ChatMessage({
<ArrowsClockwiseIcon className="size-4 text-zinc-600" />
</Button>
)}
{!isUser && isFinalMessage && isLongResponse(displayContent) && (
{!isUser && isFinalMessage && !isStreaming && (
<Button
variant="ghost"
size="icon"

View File

@@ -61,6 +61,7 @@ export type ChatMessageData =
}
| {
type: "agent_carousel";
toolId: string;
toolName: string;
agents: Array<{
id: string;
@@ -74,6 +75,7 @@ export type ChatMessageData =
}
| {
type: "execution_started";
toolId: string;
toolName: string;
executionId: string;
agentName?: string;
@@ -103,6 +105,29 @@ export type ChatMessageData =
message: string;
sessionId: string;
timestamp?: string | Date;
}
| {
type: "operation_started";
toolName: string;
toolId: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_pending";
toolName: string;
toolId: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_in_progress";
toolName: string;
toolCallId: string;
message: string;
timestamp?: string | Date;
};
export function useChatMessage(message: ChatMessageData) {
@@ -124,5 +149,8 @@ export function useChatMessage(message: ChatMessageData) {
isExecutionStarted: message.type === "execution_started",
isInputsNeeded: message.type === "inputs_needed",
isClarificationNeeded: message.type === "clarification_needed",
isOperationStarted: message.type === "operation_started",
isOperationPending: message.type === "operation_pending",
isOperationInProgress: message.type === "operation_in_progress",
};
}

View File

@@ -30,6 +30,7 @@ export function ClarificationQuestionsWidget({
className,
}: Props) {
const [answers, setAnswers] = useState<Record<string, string>>({});
const [isSubmitted, setIsSubmitted] = useState(false);
function handleAnswerChange(keyword: string, value: string) {
setAnswers((prev) => ({ ...prev, [keyword]: value }));
@@ -41,11 +42,42 @@ export function ClarificationQuestionsWidget({
if (!allAnswered) {
return;
}
setIsSubmitted(true);
onSubmitAnswers(answers);
}
const allAnswered = questions.every((q) => answers[q.keyword]?.trim());
// Show submitted state after answers are submitted
if (isSubmitted) {
return (
<div
className={cn(
"group relative flex w-full justify-start gap-3 px-4 py-3",
className,
)}
>
<div className="flex w-full max-w-3xl gap-3">
<div className="flex-shrink-0">
<div className="flex h-7 w-7 items-center justify-center rounded-lg bg-green-500">
<CheckCircleIcon className="h-4 w-4 text-white" weight="bold" />
</div>
</div>
<div className="flex min-w-0 flex-1 flex-col">
<Card className="p-4">
<Text variant="h4" className="mb-1 text-slate-900">
Answers submitted
</Text>
<Text variant="small" className="text-slate-600">
Processing your responses...
</Text>
</Card>
</div>
</div>
</div>
);
}
return (
<div
className={cn(

View File

@@ -1,7 +1,5 @@
import { AIChatBubble } from "../../../AIChatBubble/AIChatBubble";
import type { ChatMessageData } from "../../../ChatMessage/useChatMessage";
import { MarkdownContent } from "../../../MarkdownContent/MarkdownContent";
import { formatToolResponse } from "../../../ToolResponseMessage/helpers";
import { ToolResponseMessage } from "../../../ToolResponseMessage/ToolResponseMessage";
import { shouldSkipAgentOutput } from "../../helpers";
export interface LastToolResponseProps {
@@ -15,16 +13,15 @@ export function LastToolResponse({
}: LastToolResponseProps) {
if (message.type !== "tool_response") return null;
// Skip if this is an agent_output that should be rendered inside assistant message
if (shouldSkipAgentOutput(message, prevMessage)) return null;
const formattedText = formatToolResponse(message.result, message.toolName);
return (
<div className="min-w-0 overflow-x-hidden hyphens-auto break-words px-4 py-2">
<AIChatBubble>
<MarkdownContent content={formattedText} />
</AIChatBubble>
<ToolResponseMessage
toolId={message.toolId}
toolName={message.toolName}
result={message.result}
/>
</div>
);
}

View File

@@ -0,0 +1,109 @@
"use client";
import { Card } from "@/components/atoms/Card/Card";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import { CircleNotch, CheckCircle, XCircle } from "@phosphor-icons/react";
type OperationStatus =
| "pending"
| "started"
| "in_progress"
| "completed"
| "error";
interface Props {
status: OperationStatus;
message: string;
toolName?: string;
className?: string;
}
function getOperationTitle(toolName?: string): string {
if (!toolName) return "Operation";
// Convert tool name to human-readable format
// e.g., "create_agent" -> "Creating Agent", "edit_agent" -> "Editing Agent"
if (toolName === "create_agent") return "Creating Agent";
if (toolName === "edit_agent") return "Editing Agent";
// Default: capitalize and format tool name
return toolName
.split("_")
.map((word) => word.charAt(0).toUpperCase() + word.slice(1))
.join(" ");
}
export function PendingOperationWidget({
status,
message,
toolName,
className,
}: Props) {
const isPending =
status === "pending" || status === "started" || status === "in_progress";
const isCompleted = status === "completed";
const isError = status === "error";
const operationTitle = getOperationTitle(toolName);
return (
<div
className={cn(
"group relative flex w-full justify-start gap-3 px-4 py-3",
className,
)}
>
<div className="flex w-full max-w-3xl gap-3">
<div className="flex-shrink-0">
<div
className={cn(
"flex h-7 w-7 items-center justify-center rounded-lg",
isPending && "bg-blue-500",
isCompleted && "bg-green-500",
isError && "bg-red-500",
)}
>
{isPending && (
<CircleNotch
className="h-4 w-4 animate-spin text-white"
weight="bold"
/>
)}
{isCompleted && (
<CheckCircle className="h-4 w-4 text-white" weight="bold" />
)}
{isError && (
<XCircle className="h-4 w-4 text-white" weight="bold" />
)}
</div>
</div>
<div className="flex min-w-0 flex-1 flex-col">
<Card className="space-y-2 p-4">
<div>
<Text variant="h4" className="mb-1 text-slate-900">
{isPending && operationTitle}
{isCompleted && `${operationTitle} Complete`}
{isError && `${operationTitle} Failed`}
</Text>
<Text variant="small" className="text-slate-600">
{message}
</Text>
</div>
{isPending && (
<Text variant="small" className="italic text-slate-500">
Check your library in a few minutes.
</Text>
)}
{toolName && (
<Text variant="small" className="text-slate-400">
Tool: {toolName}
</Text>
)}
</Card>
</div>
</div>
</div>
);
}

View File

@@ -1,7 +1,14 @@
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import type { ToolResult } from "@/types/chat";
import { WarningCircleIcon } from "@phosphor-icons/react";
import { AIChatBubble } from "../AIChatBubble/AIChatBubble";
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
import { formatToolResponse } from "./helpers";
import {
formatToolResponse,
getErrorMessage,
isErrorResponse,
} from "./helpers";
export interface ToolResponseMessageProps {
toolId?: string;
@@ -18,6 +25,24 @@ export function ToolResponseMessage({
success: _success,
className,
}: ToolResponseMessageProps) {
if (isErrorResponse(result)) {
const errorMessage = getErrorMessage(result);
return (
<AIChatBubble className={className}>
<div className="flex items-center gap-2">
<WarningCircleIcon
size={14}
weight="regular"
className="shrink-0 text-neutral-400"
/>
<Text variant="small" className={cn("text-xs text-neutral-500")}>
{errorMessage}
</Text>
</div>
</AIChatBubble>
);
}
const formattedText = formatToolResponse(result, toolName);
return (

View File

@@ -1,3 +1,42 @@
function stripInternalReasoning(content: string): string {
return content
.replace(/<internal_reasoning>[\s\S]*?<\/internal_reasoning>/gi, "")
.replace(/<thinking>[\s\S]*?<\/thinking>/gi, "")
.replace(/\n{3,}/g, "\n\n")
.trim();
}
export function isErrorResponse(result: unknown): boolean {
if (typeof result === "string") {
const lower = result.toLowerCase();
return (
lower.startsWith("error:") ||
lower.includes("not found") ||
lower.includes("does not exist") ||
lower.includes("failed to") ||
lower.includes("unable to")
);
}
if (typeof result === "object" && result !== null) {
const response = result as Record<string, unknown>;
return response.type === "error" || response.error !== undefined;
}
return false;
}
export function getErrorMessage(result: unknown): string {
if (typeof result === "string") {
return stripInternalReasoning(result.replace(/^error:\s*/i, ""));
}
if (typeof result === "object" && result !== null) {
const response = result as Record<string, unknown>;
if (response.error) return stripInternalReasoning(String(response.error));
if (response.message)
return stripInternalReasoning(String(response.message));
}
return "An error occurred";
}
function getToolCompletionPhrase(toolName: string): string {
const toolCompletionPhrases: Record<string, string> = {
add_understanding: "Updated your business information",
@@ -28,10 +67,10 @@ export function formatToolResponse(result: unknown, toolName: string): string {
const parsed = JSON.parse(trimmed);
return formatToolResponse(parsed, toolName);
} catch {
return trimmed;
return stripInternalReasoning(trimmed);
}
}
return result;
return stripInternalReasoning(result);
}
if (typeof result !== "object" || result === null) {

View File

@@ -10,7 +10,7 @@ export function UserChatBubble({ children, className }: UserChatBubbleProps) {
return (
<div
className={cn(
"group relative min-w-20 overflow-hidden rounded-xl bg-purple-100 px-3 text-right text-[1rem] leading-relaxed transition-all duration-500 ease-in-out",
"group relative min-w-20 overflow-hidden rounded-xl bg-purple-100 px-3 text-left text-[1rem] leading-relaxed transition-all duration-500 ease-in-out",
className,
)}
style={{

View File

@@ -0,0 +1,142 @@
import type {
ActiveStream,
StreamChunk,
VercelStreamChunk,
} from "./chat-types";
import {
INITIAL_RETRY_DELAY,
MAX_RETRIES,
normalizeStreamChunk,
parseSSELine,
} from "./stream-utils";
function notifySubscribers(stream: ActiveStream, chunk: StreamChunk) {
stream.chunks.push(chunk);
for (const callback of stream.onChunkCallbacks) {
try {
callback(chunk);
} catch (err) {
console.warn("[StreamExecutor] Subscriber callback error:", err);
}
}
}
export async function executeStream(
stream: ActiveStream,
message: string,
isUserMessage: boolean,
context?: { url: string; content: string },
retryCount: number = 0,
): Promise<void> {
const { sessionId, abortController } = stream;
try {
const url = `/api/chat/sessions/${sessionId}/stream`;
const body = JSON.stringify({
message,
is_user_message: isUserMessage,
context: context || null,
});
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body,
signal: abortController.signal,
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(errorText || `HTTP ${response.status}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
notifySubscribers(stream, { type: "stream_end" });
stream.status = "completed";
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
const data = parseSSELine(line);
if (data !== null) {
if (data === "[DONE]") {
notifySubscribers(stream, { type: "stream_end" });
stream.status = "completed";
return;
}
try {
const rawChunk = JSON.parse(data) as
| StreamChunk
| VercelStreamChunk;
const chunk = normalizeStreamChunk(rawChunk);
if (!chunk) continue;
notifySubscribers(stream, chunk);
if (chunk.type === "stream_end") {
stream.status = "completed";
return;
}
if (chunk.type === "error") {
stream.status = "error";
stream.error = new Error(
chunk.message || chunk.content || "Stream error",
);
return;
}
} catch (err) {
console.warn("[StreamExecutor] Failed to parse SSE chunk:", err);
}
}
}
}
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
notifySubscribers(stream, { type: "stream_end" });
stream.status = "completed";
return;
}
if (retryCount < MAX_RETRIES) {
const retryDelay = INITIAL_RETRY_DELAY * Math.pow(2, retryCount);
console.log(
`[StreamExecutor] Retrying in ${retryDelay}ms (attempt ${retryCount + 1}/${MAX_RETRIES})`,
);
await new Promise((resolve) => setTimeout(resolve, retryDelay));
return executeStream(
stream,
message,
isUserMessage,
context,
retryCount + 1,
);
}
stream.status = "error";
stream.error = err instanceof Error ? err : new Error("Stream failed");
notifySubscribers(stream, {
type: "error",
message: stream.error.message,
});
}
}

View File

@@ -0,0 +1,84 @@
import type { ToolArguments, ToolResult } from "@/types/chat";
import type { StreamChunk, VercelStreamChunk } from "./chat-types";
const LEGACY_STREAM_TYPES = new Set<StreamChunk["type"]>([
"text_chunk",
"text_ended",
"tool_call",
"tool_call_start",
"tool_response",
"login_needed",
"need_login",
"credentials_needed",
"error",
"usage",
"stream_end",
]);
export function isLegacyStreamChunk(
chunk: StreamChunk | VercelStreamChunk,
): chunk is StreamChunk {
return LEGACY_STREAM_TYPES.has(chunk.type as StreamChunk["type"]);
}
export function normalizeStreamChunk(
chunk: StreamChunk | VercelStreamChunk,
): StreamChunk | null {
if (isLegacyStreamChunk(chunk)) return chunk;
switch (chunk.type) {
case "text-delta":
return { type: "text_chunk", content: chunk.delta };
case "text-end":
return { type: "text_ended" };
case "tool-input-available":
return {
type: "tool_call_start",
tool_id: chunk.toolCallId,
tool_name: chunk.toolName,
arguments: chunk.input as ToolArguments,
};
case "tool-output-available":
return {
type: "tool_response",
tool_id: chunk.toolCallId,
tool_name: chunk.toolName,
result: chunk.output as ToolResult,
success: chunk.success ?? true,
};
case "usage":
return {
type: "usage",
promptTokens: chunk.promptTokens,
completionTokens: chunk.completionTokens,
totalTokens: chunk.totalTokens,
};
case "error":
return {
type: "error",
message: chunk.errorText,
code: chunk.code,
details: chunk.details,
};
case "finish":
return { type: "stream_end" };
case "start":
case "text-start":
return null;
case "tool-input-start":
return {
type: "tool_call_start",
tool_id: chunk.toolCallId,
tool_name: chunk.toolName,
arguments: {},
};
}
}
export const MAX_RETRIES = 3;
export const INITIAL_RETRY_DELAY = 1000;
export function parseSSELine(line: string): string | null {
if (line.startsWith("data: ")) return line.slice(6);
return null;
}

View File

@@ -2,7 +2,6 @@
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import { useChatSession } from "./useChatSession";
import { useChatStream } from "./useChatStream";
@@ -27,6 +26,7 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
claimSession,
clearSession: clearSessionBase,
loadSession,
startPollingForOperation,
} = useChatSession({
urlSessionId,
autoCreate: false,
@@ -67,38 +67,16 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
],
);
useEffect(() => {
if (isLoading || isCreating) {
const timer = setTimeout(() => {
setShowLoader(true);
}, 300);
return () => clearTimeout(timer);
} else {
useEffect(
function showLoaderWithDelay() {
if (isLoading || isCreating) {
const timer = setTimeout(() => setShowLoader(true), 300);
return () => clearTimeout(timer);
}
setShowLoader(false);
}
}, [isLoading, isCreating]);
useEffect(function monitorNetworkStatus() {
function handleOnline() {
toast.success("Connection restored", {
description: "You're back online",
});
}
function handleOffline() {
toast.error("You're offline", {
description: "Check your internet connection",
});
}
window.addEventListener("online", handleOnline);
window.addEventListener("offline", handleOffline);
return () => {
window.removeEventListener("online", handleOnline);
window.removeEventListener("offline", handleOffline);
};
}, []);
},
[isLoading, isCreating],
);
function clearSession() {
clearSessionBase();
@@ -117,5 +95,6 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
loadSession,
sessionId: sessionIdFromHook,
showLoader,
startPollingForOperation,
};
}

View File

@@ -1,17 +0,0 @@
"use client";
import { create } from "zustand";
interface ChatDrawerState {
isOpen: boolean;
open: () => void;
close: () => void;
toggle: () => void;
}
export const useChatDrawer = create<ChatDrawerState>((set) => ({
isOpen: false,
open: () => set({ isOpen: true }),
close: () => set({ isOpen: false }),
toggle: () => set((state) => ({ isOpen: !state.isOpen })),
}));

View File

@@ -1,6 +1,7 @@
import {
getGetV2GetSessionQueryKey,
getGetV2GetSessionQueryOptions,
getGetV2ListSessionsQueryKey,
postV2CreateSession,
useGetV2GetSession,
usePatchV2SessionAssignUser,
@@ -58,6 +59,7 @@ export function useChatSession({
query: {
enabled: !!sessionId,
select: okData,
staleTime: 0,
retry: shouldRetrySessionLoad,
retryDelay: getSessionRetryDelay,
},
@@ -101,6 +103,125 @@ export function useChatSession({
}
}, [createError, loadError]);
// Track if we should be polling (set by external callers when they receive operation_started via SSE)
const [forcePolling, setForcePolling] = useState(false);
// Track if we've seen server acknowledge the pending operation (to avoid clearing forcePolling prematurely)
const hasSeenServerPendingRef = useRef(false);
// Check if there are any pending operations in the messages
// Must check all operation types: operation_pending, operation_started, operation_in_progress
const hasPendingOperationsFromServer = useMemo(() => {
if (!messages || messages.length === 0) return false;
const pendingTypes = new Set([
"operation_pending",
"operation_in_progress",
"operation_started",
]);
return messages.some((msg) => {
if (msg.role !== "tool" || !msg.content) return false;
try {
const content =
typeof msg.content === "string"
? JSON.parse(msg.content)
: msg.content;
return pendingTypes.has(content?.type);
} catch {
return false;
}
});
}, [messages]);
// Track when server has acknowledged the pending operation
useEffect(() => {
if (hasPendingOperationsFromServer) {
hasSeenServerPendingRef.current = true;
}
}, [hasPendingOperationsFromServer]);
// Combined: poll if server has pending ops OR if we received operation_started via SSE
const hasPendingOperations = hasPendingOperationsFromServer || forcePolling;
// Clear forcePolling only after server has acknowledged AND completed the operation
useEffect(() => {
if (
forcePolling &&
!hasPendingOperationsFromServer &&
hasSeenServerPendingRef.current
) {
// Server acknowledged the operation and it's now complete
setForcePolling(false);
hasSeenServerPendingRef.current = false;
}
}, [forcePolling, hasPendingOperationsFromServer]);
// Function to trigger polling (called when operation_started is received via SSE)
function startPollingForOperation() {
setForcePolling(true);
hasSeenServerPendingRef.current = false; // Reset for new operation
}
// Refresh sessions list when a pending operation completes
// (hasPendingOperations transitions from true to false)
const prevHasPendingOperationsRef = useRef(hasPendingOperations);
useEffect(
function refreshSessionsListOnOperationComplete() {
const wasHasPending = prevHasPendingOperationsRef.current;
prevHasPendingOperationsRef.current = hasPendingOperations;
// Only invalidate when transitioning from pending to not pending
if (wasHasPending && !hasPendingOperations && sessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
}
},
[hasPendingOperations, sessionId, queryClient],
);
// Poll for updates when there are pending operations
// Backoff: 2s, 4s, 6s, 8s, 10s, ... up to 30s max
const pollAttemptRef = useRef(0);
const hasPendingOperationsRef = useRef(hasPendingOperations);
hasPendingOperationsRef.current = hasPendingOperations;
useEffect(
function pollForPendingOperations() {
if (!sessionId || !hasPendingOperations) {
pollAttemptRef.current = 0;
return;
}
let cancelled = false;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
function schedule() {
// 2s, 4s, 6s, 8s, 10s, ... 30s (max)
const delay = Math.min((pollAttemptRef.current + 1) * 2000, 30000);
timeoutId = setTimeout(async () => {
if (cancelled) return;
pollAttemptRef.current += 1;
try {
await refetch();
} catch (err) {
console.error("[useChatSession] Poll failed:", err);
} finally {
if (!cancelled && hasPendingOperationsRef.current) {
schedule();
}
}
}, delay);
}
schedule();
return () => {
cancelled = true;
if (timeoutId) clearTimeout(timeoutId);
};
},
[sessionId, hasPendingOperations, refetch],
);
async function createSession() {
try {
setError(null);
@@ -227,11 +348,13 @@ export function useChatSession({
isCreating,
error,
isSessionNotFound: isNotFoundError(loadError),
hasPendingOperations,
createSession,
loadSession,
refreshSession,
claimSession,
clearSession,
startPollingForOperation,
};
}

View File

@@ -1,543 +1,110 @@
import type { ToolArguments, ToolResult } from "@/types/chat";
import { useCallback, useEffect, useRef, useState } from "react";
"use client";
import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import { useChatStore } from "./chat-store";
import type { StreamChunk } from "./chat-types";
const MAX_RETRIES = 3;
const INITIAL_RETRY_DELAY = 1000;
export interface StreamChunk {
type:
| "text_chunk"
| "text_ended"
| "tool_call"
| "tool_call_start"
| "tool_response"
| "login_needed"
| "need_login"
| "credentials_needed"
| "error"
| "usage"
| "stream_end";
timestamp?: string;
content?: string;
message?: string;
code?: string;
details?: Record<string, unknown>;
tool_id?: string;
tool_name?: string;
arguments?: ToolArguments;
result?: ToolResult;
success?: boolean;
idx?: number;
session_id?: string;
agent_info?: {
graph_id: string;
name: string;
trigger_type: string;
};
provider?: string;
provider_name?: string;
credential_type?: string;
scopes?: string[];
title?: string;
[key: string]: unknown;
}
type VercelStreamChunk =
| { type: "start"; messageId: string }
| { type: "finish" }
| { type: "text-start"; id: string }
| { type: "text-delta"; id: string; delta: string }
| { type: "text-end"; id: string }
| { type: "tool-input-start"; toolCallId: string; toolName: string }
| {
type: "tool-input-available";
toolCallId: string;
toolName: string;
input: ToolArguments;
}
| {
type: "tool-output-available";
toolCallId: string;
toolName?: string;
output: ToolResult;
success?: boolean;
}
| {
type: "usage";
promptTokens: number;
completionTokens: number;
totalTokens: number;
}
| {
type: "error";
errorText: string;
code?: string;
details?: Record<string, unknown>;
};
const LEGACY_STREAM_TYPES = new Set<StreamChunk["type"]>([
"text_chunk",
"text_ended",
"tool_call",
"tool_call_start",
"tool_response",
"login_needed",
"need_login",
"credentials_needed",
"error",
"usage",
"stream_end",
]);
function isLegacyStreamChunk(
chunk: StreamChunk | VercelStreamChunk,
): chunk is StreamChunk {
return LEGACY_STREAM_TYPES.has(chunk.type as StreamChunk["type"]);
}
function normalizeStreamChunk(
chunk: StreamChunk | VercelStreamChunk,
): StreamChunk | null {
if (isLegacyStreamChunk(chunk)) {
return chunk;
}
switch (chunk.type) {
case "text-delta":
return { type: "text_chunk", content: chunk.delta };
case "text-end":
return { type: "text_ended" };
case "tool-input-available":
return {
type: "tool_call_start",
tool_id: chunk.toolCallId,
tool_name: chunk.toolName,
arguments: chunk.input,
};
case "tool-output-available":
return {
type: "tool_response",
tool_id: chunk.toolCallId,
tool_name: chunk.toolName,
result: chunk.output,
success: chunk.success ?? true,
};
case "usage":
return {
type: "usage",
promptTokens: chunk.promptTokens,
completionTokens: chunk.completionTokens,
totalTokens: chunk.totalTokens,
};
case "error":
return {
type: "error",
message: chunk.errorText,
code: chunk.code,
details: chunk.details,
};
case "finish":
return { type: "stream_end" };
case "start":
case "text-start":
return null;
case "tool-input-start":
const toolInputStart = chunk as Extract<
VercelStreamChunk,
{ type: "tool-input-start" }
>;
return {
type: "tool_call_start",
tool_id: toolInputStart.toolCallId,
tool_name: toolInputStart.toolName,
arguments: {},
};
}
}
export type { StreamChunk } from "./chat-types";
export function useChatStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<Error | null>(null);
const retryCountRef = useRef<number>(0);
const retryTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const abortControllerRef = useRef<AbortController | null>(null);
const currentSessionIdRef = useRef<string | null>(null);
const requestStartTimeRef = useRef<number | null>(null);
const stopStreaming = useCallback(
(sessionId?: string, force: boolean = false) => {
console.log("[useChatStream] stopStreaming called", {
hasAbortController: !!abortControllerRef.current,
isAborted: abortControllerRef.current?.signal.aborted,
currentSessionId: currentSessionIdRef.current,
requestedSessionId: sessionId,
requestStartTime: requestStartTimeRef.current,
timeSinceStart: requestStartTimeRef.current
? Date.now() - requestStartTimeRef.current
: null,
force,
stack: new Error().stack,
});
if (
sessionId &&
currentSessionIdRef.current &&
currentSessionIdRef.current !== sessionId
) {
console.log(
"[useChatStream] Session changed, aborting previous stream",
{
oldSessionId: currentSessionIdRef.current,
newSessionId: sessionId,
},
);
}
const controller = abortControllerRef.current;
if (controller) {
const timeSinceStart = requestStartTimeRef.current
? Date.now() - requestStartTimeRef.current
: null;
if (!force && timeSinceStart !== null && timeSinceStart < 100) {
console.log(
"[useChatStream] Request just started (<100ms), skipping abort to prevent race condition",
{
timeSinceStart,
},
);
return;
}
try {
const signal = controller.signal;
if (
signal &&
typeof signal.aborted === "boolean" &&
!signal.aborted
) {
console.log("[useChatStream] Aborting stream");
controller.abort();
} else {
console.log(
"[useChatStream] Stream already aborted or signal invalid",
);
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
console.log(
"[useChatStream] AbortError caught (expected during cleanup)",
);
} else {
console.warn("[useChatStream] Error aborting stream:", error);
}
} finally {
abortControllerRef.current = null;
requestStartTimeRef.current = null;
}
}
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
retryTimeoutRef.current = null;
}
setIsStreaming(false);
},
[],
const onChunkCallbackRef = useRef<((chunk: StreamChunk) => void) | null>(
null,
);
const stopStream = useChatStore((s) => s.stopStream);
const unregisterActiveSession = useChatStore(
(s) => s.unregisterActiveSession,
);
const isSessionActive = useChatStore((s) => s.isSessionActive);
const onStreamComplete = useChatStore((s) => s.onStreamComplete);
const getCompletedStream = useChatStore((s) => s.getCompletedStream);
const registerActiveSession = useChatStore((s) => s.registerActiveSession);
const startStream = useChatStore((s) => s.startStream);
const getStreamStatus = useChatStore((s) => s.getStreamStatus);
function stopStreaming(sessionId?: string) {
const targetSession = sessionId || currentSessionIdRef.current;
if (targetSession) {
stopStream(targetSession);
unregisterActiveSession(targetSession);
}
setIsStreaming(false);
}
useEffect(() => {
console.log("[useChatStream] Component mounted");
return () => {
const sessionIdAtUnmount = currentSessionIdRef.current;
console.log(
"[useChatStream] Component unmounting, calling stopStreaming",
{
sessionIdAtUnmount,
},
);
stopStreaming(undefined, false);
return function cleanup() {
const sessionId = currentSessionIdRef.current;
if (sessionId && !isSessionActive(sessionId)) {
stopStream(sessionId);
}
currentSessionIdRef.current = null;
onChunkCallbackRef.current = null;
};
}, [stopStreaming]);
}, []);
const sendMessage = useCallback(
async (
sessionId: string,
message: string,
onChunk: (chunk: StreamChunk) => void,
isUserMessage: boolean = true,
context?: { url: string; content: string },
isRetry: boolean = false,
) => {
console.log("[useChatStream] sendMessage called", {
sessionId,
message: message.substring(0, 50),
isUserMessage,
isRetry,
stack: new Error().stack,
});
useEffect(() => {
const unsubscribe = onStreamComplete(
function handleStreamComplete(completedSessionId) {
if (completedSessionId !== currentSessionIdRef.current) return;
const previousSessionId = currentSessionIdRef.current;
stopStreaming(sessionId, true);
currentSessionIdRef.current = sessionId;
const abortController = new AbortController();
abortControllerRef.current = abortController;
requestStartTimeRef.current = Date.now();
console.log("[useChatStream] Created new AbortController", {
sessionId,
previousSessionId,
requestStartTime: requestStartTimeRef.current,
});
if (abortController.signal.aborted) {
console.warn(
"[useChatStream] AbortController was aborted before request started",
);
requestStartTimeRef.current = null;
return Promise.reject(new Error("Request aborted"));
}
if (!isRetry) {
retryCountRef.current = 0;
}
setIsStreaming(true);
setError(null);
try {
const url = `/api/chat/sessions/${sessionId}/stream`;
const body = JSON.stringify({
message,
is_user_message: isUserMessage,
context: context || null,
});
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body,
signal: abortController.signal,
});
console.info("[useChatStream] Stream response", {
sessionId,
status: response.status,
ok: response.ok,
contentType: response.headers.get("content-type"),
});
if (!response.ok) {
const errorText = await response.text();
console.warn("[useChatStream] Stream response error", {
sessionId,
status: response.status,
errorText,
});
throw new Error(errorText || `HTTP ${response.status}`);
}
if (!response.body) {
console.warn("[useChatStream] Response body is null", { sessionId });
throw new Error("Response body is null");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let receivedChunkCount = 0;
let firstChunkAt: number | null = null;
let loggedLineCount = 0;
return new Promise<void>((resolve, reject) => {
let didDispatchStreamEnd = false;
function dispatchStreamEnd() {
if (didDispatchStreamEnd) return;
didDispatchStreamEnd = true;
onChunk({ type: "stream_end" });
}
const cleanup = () => {
reader.cancel().catch(() => {
// Ignore cancel errors
});
};
async function readStream() {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
cleanup();
console.info("[useChatStream] Stream closed", {
sessionId,
receivedChunkCount,
timeSinceStart: requestStartTimeRef.current
? Date.now() - requestStartTimeRef.current
: null,
});
dispatchStreamEnd();
retryCountRef.current = 0;
stopStreaming();
resolve();
return;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (loggedLineCount < 3) {
console.info("[useChatStream] Raw stream line", {
sessionId,
data:
data.length > 300 ? `${data.slice(0, 300)}...` : data,
});
loggedLineCount += 1;
}
if (data === "[DONE]") {
cleanup();
console.info("[useChatStream] Stream done marker", {
sessionId,
receivedChunkCount,
timeSinceStart: requestStartTimeRef.current
? Date.now() - requestStartTimeRef.current
: null,
});
dispatchStreamEnd();
retryCountRef.current = 0;
stopStreaming();
resolve();
return;
}
try {
const rawChunk = JSON.parse(data) as
| StreamChunk
| VercelStreamChunk;
const chunk = normalizeStreamChunk(rawChunk);
if (!chunk) {
continue;
}
if (!firstChunkAt) {
firstChunkAt = Date.now();
console.info("[useChatStream] First stream chunk", {
sessionId,
chunkType: chunk.type,
timeSinceStart: requestStartTimeRef.current
? firstChunkAt - requestStartTimeRef.current
: null,
});
}
receivedChunkCount += 1;
// Call the chunk handler
onChunk(chunk);
// Handle stream lifecycle
if (chunk.type === "stream_end") {
didDispatchStreamEnd = true;
cleanup();
console.info("[useChatStream] Stream end chunk", {
sessionId,
receivedChunkCount,
timeSinceStart: requestStartTimeRef.current
? Date.now() - requestStartTimeRef.current
: null,
});
retryCountRef.current = 0;
stopStreaming();
resolve();
return;
} else if (chunk.type === "error") {
cleanup();
reject(
new Error(
chunk.message || chunk.content || "Stream error",
),
);
return;
}
} catch (err) {
// Skip invalid JSON lines
console.warn("Failed to parse SSE chunk:", err, data);
}
}
}
}
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
cleanup();
dispatchStreamEnd();
stopStreaming();
resolve();
return;
}
const streamError =
err instanceof Error ? err : new Error("Failed to read stream");
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
const retryDelay =
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1);
toast.info("Connection interrupted", {
description: `Retrying in ${retryDelay / 1000} seconds...`,
});
retryTimeoutRef.current = setTimeout(() => {
sendMessage(
sessionId,
message,
onChunk,
isUserMessage,
context,
true,
).catch((_err) => {
// Retry failed
});
}, retryDelay);
} else {
setError(streamError);
toast.error("Connection Failed", {
description:
"Unable to connect to chat service. Please try again.",
});
cleanup();
dispatchStreamEnd();
retryCountRef.current = 0;
stopStreaming();
reject(streamError);
}
}
}
readStream();
});
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
setIsStreaming(false);
return Promise.resolve();
}
const streamError =
err instanceof Error ? err : new Error("Failed to start stream");
setError(streamError);
setIsStreaming(false);
throw streamError;
const completed = getCompletedStream(completedSessionId);
if (completed?.error) {
setError(completed.error);
}
unregisterActiveSession(completedSessionId);
},
);
return unsubscribe;
}, []);
async function sendMessage(
sessionId: string,
message: string,
onChunk: (chunk: StreamChunk) => void,
isUserMessage: boolean = true,
context?: { url: string; content: string },
) {
const previousSessionId = currentSessionIdRef.current;
if (previousSessionId && previousSessionId !== sessionId) {
stopStreaming(previousSessionId);
}
currentSessionIdRef.current = sessionId;
onChunkCallbackRef.current = onChunk;
setIsStreaming(true);
setError(null);
registerActiveSession(sessionId);
try {
await startStream(sessionId, message, isUserMessage, context, onChunk);
const status = getStreamStatus(sessionId);
if (status === "error") {
const completed = getCompletedStream(sessionId);
if (completed?.error) {
setError(completed.error);
toast.error("Connection Failed", {
description: "Unable to connect to chat service. Please try again.",
});
throw completed.error;
}
}
},
[stopStreaming],
);
} catch (err) {
const streamError =
err instanceof Error ? err : new Error("Failed to start stream");
setError(streamError);
throw streamError;
} finally {
setIsStreaming(false);
}
}
return {
isStreaming,

View File

@@ -9,11 +9,12 @@ import { ReactNode, useEffect, useRef } from "react";
export function PostHogProvider({ children }: { children: ReactNode }) {
const isPostHogEnabled = environment.isPostHogEnabled();
const postHogCredentials = environment.getPostHogCredentials();
useEffect(() => {
if (process.env.NEXT_PUBLIC_POSTHOG_KEY) {
posthog.init(process.env.NEXT_PUBLIC_POSTHOG_KEY, {
api_host: process.env.NEXT_PUBLIC_POSTHOG_HOST,
if (postHogCredentials.key) {
posthog.init(postHogCredentials.key, {
api_host: postHogCredentials.host,
defaults: "2025-11-30",
capture_pageview: false,
capture_pageleave: true,

View File

@@ -0,0 +1,8 @@
"use client";
import { useNetworkStatus } from "./useNetworkStatus";
export function NetworkStatusMonitor() {
useNetworkStatus();
return null;
}

View File

@@ -0,0 +1,28 @@
"use client";
import { useEffect } from "react";
import { toast } from "sonner";
export function useNetworkStatus() {
useEffect(function monitorNetworkStatus() {
function handleOnline() {
toast.success("Connection restored", {
description: "You're back online",
});
}
function handleOffline() {
toast.error("You're offline", {
description: "Check your internet connection",
});
}
window.addEventListener("online", handleOnline);
window.addEventListener("offline", handleOffline);
return function cleanup() {
window.removeEventListener("online", handleOnline);
window.removeEventListener("offline", handleOffline);
};
}, []);
}

View File

@@ -3,6 +3,7 @@ import { environment } from "../environment";
export enum SessionKey {
CHAT_SENT_INITIAL_PROMPTS = "chat_sent_initial_prompts",
CHAT_INITIAL_PROMPTS = "chat_initial_prompts",
}
function get(key: SessionKey) {

View File

@@ -1 +0,0 @@
# Video editing blocks