Compare commits

..

19 Commits

Author SHA1 Message Date
Nicholas Tindle
77453f5f15 Merge branch 'dev' into claude/fix-block-search-vector-error-QMbO4 2026-01-27 10:50:47 -06:00
Nicholas Tindle
9ffff490b5 fix(backend): add pod and pgvector diagnostics to vector query logging
Enhance debug logging for vector query failures to help identify which
database replicas have pgvector installed vs not:

- Add get_pod_info() to capture pod hostname, name, namespace, and IP
- Extend get_connection_debug_info() to check pgvector extension status,
  including which schema it's installed in and list all extensions
- Log connection details BEFORE every vector query (not just on failure)
  to correlate backend_pid with success/failure patterns
- Include server_addr to identify which DB replica is being hit

This helps diagnose the 30% failure rate where identical queries randomly
fail with "type vector does not exist" - likely due to some replicas
missing the pgvector extension.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 15:22:39 -06:00
Nicholas Tindle
097949b3e7 Merge branch 'feat/agent-generator' into claude/fix-block-search-vector-error-QMbO4 2026-01-26 13:10:02 -06:00
Zamil Majdy
7f1a1f636f Merge branch 'dev' into feat/agent-generator 2026-01-26 11:23:42 -06:00
Zamil Majdy
4dc4ca4256 fix(frontend): address PR review comments
- Remove unnecessary useCallback from handleClarificationAnswers
- Remove redundant comment for clarification_needed rendering
- Use Input component from design system instead of raw textarea
2026-01-26 10:38:23 -06:00
Claude
00730496e3 fix(backend): use SET LOCAL search_path for vector queries
The connection-level search_path via options parameter is being ignored,
likely by PgBouncer/Supavisor in transaction pooling mode. Logs showed
connections with wrong search_path ('"$user", public, extensions') and
wrong current_schema ('public' instead of 'platform').

This fix wraps vector-type queries (those using ::vector or <=>) in a
transaction with SET LOCAL search_path to ensure pgvector types are
always resolvable, regardless of PgBouncer configuration.

Also exposed SEARCH_PATH constant for use in other modules if needed.
2026-01-25 23:36:30 +00:00
Claude
21c753b971 fix(backend): ensure pgvector search_path for all pooled connections
Fix random "type 'vector' does not exist" errors during copilot chat
block search. The error occurred because pooled database connections
could have inconsistent search_path configurations that didn't include
the schema where pgvector was installed.

The fix adds search_path to the PostgreSQL connection options to ensure
all connections include both the application schema and common extension
schemas (public, extensions) where pgvector may be installed across
different environments (local, CI, Supabase).

Also adds connection debug logging for "does not exist" errors to help
diagnose the related "CoPilotUnderstanding table does not exist" issue,
which may indicate connections routing to different database instances.

Debug info includes: search_path, current_schema, server_addr, backend_pid.
2026-01-25 22:26:10 +00:00
Zamil Majdy
732dfcbb63 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/agent-generator 2026-01-24 15:45:55 -06:00
Zamil Majdy
eebaf7df14 feat(frontend): implement clarification questions UI for agent generation
Add interactive form to collect user answers when agent-generator service
returns clarifying questions during agent creation/editing.

Changes:
- Add clarification_needed message type to ChatMessageData
- Create ClarificationQuestionsWidget component for collecting answers
- Update parseToolResponse to detect clarification_needed responses
- Integrate widget into ChatMessage rendering

Fixes issue where users had no way to answer clarifying questions,
causing the chat to keep retrying without necessary context.
2026-01-24 15:45:34 -06:00
Zamil Majdy
653aab44b6 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/agent-generator 2026-01-24 15:05:34 -06:00
Zamil Majdy
f0bc3f2a49 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/agent-generator 2026-01-24 11:16:02 -06:00
Zamil Majdy
e702d77cdf Revert "fix(backend): resolve OAuth test event loop issue"
This reverts commit 25d9dbac83.
2026-01-23 21:25:41 -06:00
Zamil Majdy
38741d2465 Merge branch 'dev' into feat/agent-generator 2026-01-23 21:23:00 -05:00
Zamil Majdy
25d9dbac83 fix(backend): resolve OAuth test event loop issue
Fix RuntimeError: Event loop is closed when running oauth_test.py in
full test suite. Changed client fixture from implicit session scope to
explicit function scope to avoid httpx.AsyncClient binding to event
loops that get closed/replaced between tests.

- Set @pytest.fixture(scope="function") for client fixture
- Added explicit timeout=30.0 to AsyncClient
- Added documentation explaining the fix

This resolves the last failing test in the suite (1722 passed, 1 error).
2026-01-23 20:20:16 -06:00
Zamil Majdy
fcbecf3502 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/agent-generator 2026-01-22 14:32:39 -05:00
Zamil Majdy
da9c4a4adf fix: wrap generate_agent call in try/except for consistency
Add exception handler for AgentGeneratorNotConfiguredError in generate_agent
call for defensive consistency, even though decompose_goal would typically
catch it first.

Addresses CodeRabbit review suggestion.
2026-01-21 18:48:39 -05:00
Zamil Majdy
0ca73004e5 feat: add clear error when Agent Generator service is not configured
- Add AgentGeneratorNotConfiguredError exception
- Check service configuration before calling external service
- Return helpful error message in create_agent and edit_agent tools
- Update tests to mock is_external_service_configured

Addresses Sentry review comment about unconditional external service calls
2026-01-21 18:38:05 -05:00
Zamil Majdy
9a786ed8d9 refactor: remove redundant local agent generation code
The external Agent Generator service handles fixing and validation
internally, so we no longer need these components in the backend:

- Removed client.py (built-in LLM client)
- Removed prompts.py (built-in prompts)
- Removed fixer.py (local agent fixing)
- Removed validator.py (local agent validation)
- Removed utils.py (utility functions for fixer/validator)

Simplified create_agent.py and edit_agent.py to directly use
the external service results without local post-processing.

Updated tests to match the simplified architecture.

This reduces ~1,800 lines of code that duplicated functionality
already provided by the external Agent Generator service.
2026-01-21 18:13:09 -05:00
Zamil Majdy
0a435e2ffb feat(backend): add external Agent Generator service integration
Add support for delegating agent generation to an external microservice
when AGENTGENERATOR_HOST is configured. Falls back to built-in LLM-based
implementation when not configured.

Changes:
- Add agentgenerator_host, agentgenerator_port, agentgenerator_timeout settings
- Add service.py client for external Agent Generator API
- Update core.py to delegate to external service when configured
- Export is_external_service_configured and check_external_service_health
- Add comprehensive tests for service client and core integration
2026-01-21 17:44:56 -05:00
58 changed files with 981 additions and 2373 deletions

View File

@@ -16,6 +16,7 @@ See `docs/content/platform/getting-started.md` for setup instructions.
- Format Python code with `poetry run format`.
- Format frontend code using `pnpm format`.
## Frontend guidelines:
See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
@@ -32,17 +33,14 @@ See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
5. **Testing**: Add Storybook stories for new components, Playwright for E2E
6. **Code conventions**: Function declarations (not arrow functions) for components/handlers
- Component props should be `interface Props { ... }` (not exported) unless the interface needs to be used outside the component
- Separate render logic from business logic (component.tsx + useComponent.ts + helpers.ts)
- Colocate state when possible and avoid creating large components, use sub-components ( local `/components` folder next to the parent component ) when sensible
- Avoid large hooks, abstract logic into `helpers.ts` files when sensible
- Use function declarations for components, arrow functions only for callbacks
- No barrel files or `index.ts` re-exports
- Do not use `useCallback` or `useMemo` unless strictly needed
- Avoid comments at all times unless the code is very complex
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
- Do not type hook returns, let Typescript infer as much as possible
- Never type with `any`, if not types available use `unknown`
## Testing
@@ -51,8 +49,22 @@ See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
Always run the relevant linters and tests before committing.
Use conventional commit messages for all commits (e.g. `feat(backend): add API`).
Types: - feat - fix - refactor - ci - dx (developer experience)
Scopes: - platform - platform/library - platform/marketplace - backend - backend/executor - frontend - frontend/library - frontend/marketplace - blocks
Types:
- feat
- fix
- refactor
- ci
- dx (developer experience)
Scopes:
- platform
- platform/library
- platform/marketplace
- backend
- backend/executor
- frontend
- frontend/library
- frontend/marketplace
- blocks
## Pull requests

View File

@@ -85,6 +85,17 @@ pnpm format
pnpm types
```
**📖 Complete Guide**: See `/frontend/CONTRIBUTING.md` and `/frontend/.cursorrules` for comprehensive frontend patterns.
**Key Frontend Conventions:**
- Separate render logic from data/behavior in components
- Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Use function declarations (not arrow functions) for components/handlers
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Only use Phosphor Icons
- Never use `src/components/__legacy__/*` or deprecated `BackendAPI`
## Architecture Overview
### Backend Architecture
@@ -206,17 +217,14 @@ See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
5. **Testing**: Add Storybook stories for new components, Playwright for E2E
6. **Code conventions**: Function declarations (not arrow functions) for components/handlers
- Component props should be `interface Props { ... }` (not exported) unless the interface needs to be used outside the component
- Separate render logic from business logic (component.tsx + useComponent.ts + helpers.ts)
- Colocate state when possible and avoid creating large components, use sub-components ( local `/components` folder next to the parent component ) when sensible
- Avoid large hooks, abstract logic into `helpers.ts` files when sensible
- Use function declarations for components, arrow functions only for callbacks
- No barrel files or `index.ts` re-exports
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
- Do not use `useCallback` or `useMemo` unless strictly needed
- Avoid comments at all times unless the code is very complex
- Do not type hook returns, let Typescript infer as much as possible
- Never type with `any`, if not types available use `unknown`
### Security Implementation

View File

@@ -33,15 +33,9 @@ class ChatConfig(BaseSettings):
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries")
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_runs: int = Field(default=3, description="Maximum number of agent runs")
max_agent_schedules: int = Field(
default=30, description="Maximum number of agent schedules"
)
# Long-running operation configuration
long_running_operation_ttl: int = Field(
default=600,
description="TTL in seconds for long-running operation tracking in Redis (safety net if pod dies)",
default=3, description="Maximum number of agent schedules"
)
# Langfuse Prompt Management Configuration

View File

@@ -247,45 +247,3 @@ async def get_chat_session_message_count(session_id: str) -> int:
"""Get the number of messages in a chat session."""
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
return count
async def update_tool_message_content(
session_id: str,
tool_call_id: str,
new_content: str,
) -> bool:
"""Update the content of a tool message in chat history.
Used by background tasks to update pending operation messages with final results.
Args:
session_id: The chat session ID.
tool_call_id: The tool call ID to find the message.
new_content: The new content to set.
Returns:
True if a message was updated, False otherwise.
"""
try:
result = await PrismaChatMessage.prisma().update_many(
where={
"sessionId": session_id,
"toolCallId": tool_call_id,
},
data={
"content": new_content,
},
)
if result == 0:
logger.warning(
f"No message found to update for session {session_id}, "
f"tool_call_id {tool_call_id}"
)
return False
return True
except Exception as e:
logger.error(
f"Failed to update tool message for session {session_id}, "
f"tool_call_id {tool_call_id}: {e}"
)
return False

View File

@@ -295,21 +295,6 @@ async def cache_chat_session(session: ChatSession) -> None:
await _cache_session(session)
async def invalidate_session_cache(session_id: str) -> None:
"""Invalidate a chat session from Redis cache.
Used by background tasks to ensure fresh data is loaded on next access.
This is best-effort - Redis failures are logged but don't fail the operation.
"""
try:
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
except Exception as e:
# Best-effort: log but don't fail - cache will expire naturally
logger.warning(f"Failed to invalidate session cache for {session_id}: {e}")
async def _get_session_from_db(session_id: str) -> ChatSession | None:
"""Get a chat session from the database."""
prisma_session = await chat_db.get_chat_session(session_id)

View File

@@ -17,7 +17,6 @@ from openai import (
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
from backend.data.redis_client import get_redis_async
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
@@ -25,7 +24,6 @@ from backend.data.understanding import (
from backend.util.exceptions import NotFoundError
from backend.util.settings import Settings
from . import db as chat_db
from .config import ChatConfig
from .model import (
ChatMessage,
@@ -33,7 +31,6 @@ from .model import (
Usage,
cache_chat_session,
get_chat_session,
invalidate_session_cache,
update_session_title,
upsert_chat_session,
)
@@ -51,13 +48,8 @@ from .response_model import (
StreamToolOutputAvailable,
StreamUsage,
)
from .tools import execute_tool, get_tool, tools
from .tools.models import (
ErrorResponse,
OperationInProgressResponse,
OperationPendingResponse,
OperationStartedResponse,
)
from .tools import execute_tool, tools
from .tools.models import ErrorResponse
from .tracking import track_user_message
logger = logging.getLogger(__name__)
@@ -69,126 +61,11 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# Redis key prefix for tracking running long-running operations
# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh
RUNNING_OPERATION_PREFIX = "chat:running_operation:"
# Default system prompt used when Langfuse is not configured
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
Here is everything you know about the current user from previous interactions:
<users_information>
{users_information}
</users_information>
## YOUR CORE MANDATE
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
# Module-level set to hold strong references to background tasks.
# This prevents asyncio from garbage collecting tasks before they complete.
# Tasks are automatically removed on completion via done_callback.
_background_tasks: set[asyncio.Task] = set()
async def _mark_operation_started(tool_call_id: str) -> bool:
"""Mark a long-running operation as started (Redis-based).
Returns True if successfully marked (operation was not already running),
False if operation was already running (lost race condition).
Raises exception if Redis is unavailable (fail-closed).
"""
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
# SETNX with TTL - atomic "set if not exists"
result = await redis.set(key, "1", ex=config.long_running_operation_ttl, nx=True)
return result is not None
async def _mark_operation_completed(tool_call_id: str) -> None:
"""Mark a long-running operation as completed (remove Redis key).
This is best-effort - if Redis fails, the TTL will eventually clean up.
"""
try:
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
await redis.delete(key)
except Exception as e:
# Non-critical: TTL will clean up eventually
logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}")
pass
def _is_langfuse_configured() -> bool:
@@ -198,30 +75,6 @@ def _is_langfuse_configured() -> bool:
)
async def _get_system_prompt_template(context: str) -> str:
"""Get the system prompt, trying Langfuse first with fallback to default.
Args:
context: The user context/information to compile into the prompt.
Returns:
The compiled system prompt string.
"""
if _is_langfuse_configured():
try:
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
# Use asyncio.to_thread to avoid blocking the event loop
prompt = await asyncio.to_thread(
langfuse.get_prompt, config.langfuse_prompt_name, cache_ttl_seconds=0
)
return prompt.compile(users_information=context)
except Exception as e:
logger.warning(f"Failed to fetch prompt from Langfuse, using default: {e}")
# Fallback to default prompt
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
@@ -230,8 +83,12 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, business understanding object)
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
"""
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0)
# If user is authenticated, try to fetch their business understanding
understanding = None
if user_id:
@@ -240,13 +97,12 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
understanding = None
if understanding:
context = format_understanding_for_prompt(understanding)
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = await _get_system_prompt_template(context)
compiled = prompt.compile(users_information=context)
return compiled, understanding
@@ -354,6 +210,16 @@ async def stream_chat_completion(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
# Check if Langfuse is configured - required for chat functionality
if not _is_langfuse_configured():
logger.error("Chat request failed: Langfuse is not configured")
yield StreamError(
errorText="Chat service is not available. Langfuse must be configured "
"with LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables."
)
yield StreamFinish()
return
# Only fetch from Redis if session not provided (initial call)
if session is None:
session = await get_chat_session(session_id, user_id)
@@ -449,7 +315,6 @@ async def stream_chat_completion(
has_yielded_end = False
has_yielded_error = False
has_done_tool_call = False
has_long_running_tool_call = False # Track if we had a long-running tool call
has_received_text = False
text_streaming_ended = False
tool_response_messages: list[ChatMessage] = []
@@ -471,6 +336,7 @@ async def stream_chat_completion(
system_prompt=system_prompt,
text_block_id=text_block_id,
):
if isinstance(chunk, StreamTextStart):
# Emit text-start before first text delta
if not has_received_text:
@@ -528,34 +394,13 @@ async def stream_chat_completion(
if isinstance(chunk.output, str)
else orjson.dumps(chunk.output).decode("utf-8")
)
# Skip saving long-running operation responses - messages already saved in _yield_tool_call
# Use JSON parsing instead of substring matching to avoid false positives
is_long_running_response = False
try:
parsed = orjson.loads(result_content)
if isinstance(parsed, dict) and parsed.get("type") in (
"operation_started",
"operation_in_progress",
):
is_long_running_response = True
except (orjson.JSONDecodeError, TypeError):
pass # Not JSON or not a dict - treat as regular response
if is_long_running_response:
# Remove from accumulated_tool_calls since assistant message was already saved
accumulated_tool_calls[:] = [
tc
for tc in accumulated_tool_calls
if tc["id"] != chunk.toolCallId
]
has_long_running_tool_call = True
else:
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.toolCallId,
)
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.toolCallId,
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
@@ -731,14 +576,7 @@ async def stream_chat_completion(
logger.info(
f"Extended session messages, new message_count={len(session.messages)}"
)
# Save if there are regular (non-long-running) tool responses or streaming message.
# Long-running tools save their own state, but we still need to save regular tools
# that may be in the same response.
has_regular_tool_responses = len(tool_response_messages) > 0
if has_regular_tool_responses or (
not has_long_running_tool_call
and (messages_to_save or has_appended_streaming_message)
):
if messages_to_save or has_appended_streaming_message:
await upsert_chat_session(session)
else:
logger.info(
@@ -747,9 +585,7 @@ async def stream_chat_completion(
)
# If we did a tool call, stream the chat completion again to get the next response
# Skip only if ALL tools were long-running (they handle their own completion)
has_regular_tools = len(tool_response_messages) > 0
if has_done_tool_call and (has_regular_tools or not has_long_running_tool_call):
if has_done_tool_call:
logger.info(
"Tool call executed, streaming chat completion again to get assistant response"
)
@@ -889,114 +725,6 @@ async def _summarize_messages(
return summary or "No summary available."
def _ensure_tool_pairs_intact(
recent_messages: list[dict],
all_messages: list[dict],
start_index: int,
) -> list[dict]:
"""
Ensure tool_call/tool_response pairs stay together after slicing.
When slicing messages for context compaction, a naive slice can separate
an assistant message containing tool_calls from its corresponding tool
response messages. This causes API validation errors (e.g., Anthropic's
"unexpected tool_use_id found in tool_result blocks").
This function checks for orphan tool responses in the slice and extends
backwards to include their corresponding assistant messages.
Args:
recent_messages: The sliced messages to validate
all_messages: The complete message list (for looking up missing assistants)
start_index: The index in all_messages where recent_messages begins
Returns:
A potentially extended list of messages with tool pairs intact
"""
if not recent_messages:
return recent_messages
# Collect all tool_call_ids from assistant messages in the slice
available_tool_call_ids: set[str] = set()
for msg in recent_messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
tc_id = tc.get("id")
if tc_id:
available_tool_call_ids.add(tc_id)
# Find orphan tool responses (tool messages whose tool_call_id is missing)
orphan_tool_call_ids: set[str] = set()
for msg in recent_messages:
if msg.get("role") == "tool":
tc_id = msg.get("tool_call_id")
if tc_id and tc_id not in available_tool_call_ids:
orphan_tool_call_ids.add(tc_id)
if not orphan_tool_call_ids:
# No orphans, slice is valid
return recent_messages
# Find the assistant messages that contain the orphan tool_call_ids
# Search backwards from start_index in all_messages
messages_to_prepend: list[dict] = []
for i in range(start_index - 1, -1, -1):
msg = all_messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
msg_tool_ids = {tc.get("id") for tc in msg["tool_calls"] if tc.get("id")}
if msg_tool_ids & orphan_tool_call_ids:
# This assistant message has tool_calls we need
# Also collect its contiguous tool responses that follow it
assistant_and_responses: list[dict] = [msg]
# Scan forward from this assistant to collect tool responses
for j in range(i + 1, start_index):
following_msg = all_messages[j]
if following_msg.get("role") == "tool":
tool_id = following_msg.get("tool_call_id")
if tool_id and tool_id in msg_tool_ids:
assistant_and_responses.append(following_msg)
else:
# Stop at first non-tool message
break
# Prepend the assistant and its tool responses (maintain order)
messages_to_prepend = assistant_and_responses + messages_to_prepend
# Mark these as found
orphan_tool_call_ids -= msg_tool_ids
# Also add this assistant's tool_call_ids to available set
available_tool_call_ids |= msg_tool_ids
if not orphan_tool_call_ids:
# Found all missing assistants
break
if orphan_tool_call_ids:
# Some tool_call_ids couldn't be resolved - remove those tool responses
# This shouldn't happen in normal operation but handles edge cases
logger.warning(
f"Could not find assistant messages for tool_call_ids: {orphan_tool_call_ids}. "
"Removing orphan tool responses."
)
recent_messages = [
msg
for msg in recent_messages
if not (
msg.get("role") == "tool"
and msg.get("tool_call_id") in orphan_tool_call_ids
)
]
if messages_to_prepend:
logger.info(
f"Extended recent messages by {len(messages_to_prepend)} to preserve "
f"tool_call/tool_response pairs"
)
return messages_to_prepend + recent_messages
return recent_messages
async def _stream_chat_chunks(
session: ChatSession,
tools: list[ChatCompletionToolParam],
@@ -1088,15 +816,7 @@ async def _stream_chat_chunks(
# Always attempt mitigation when over limit, even with few messages
if messages:
# Split messages based on whether system prompt exists
# Calculate start index for the slice
slice_start = max(0, len(messages_dict) - KEEP_RECENT)
recent_messages = messages_dict[-KEEP_RECENT:]
# Ensure tool_call/tool_response pairs stay together
# This prevents API errors from orphan tool responses
recent_messages = _ensure_tool_pairs_intact(
recent_messages, messages_dict, slice_start
)
recent_messages = messages[-KEEP_RECENT:]
if has_system_prompt:
# Keep system prompt separate, summarize everything between system and recent
@@ -1183,13 +903,6 @@ async def _stream_chat_chunks(
if len(recent_messages) >= keep_count
else recent_messages
)
# Ensure tool pairs stay intact in the reduced slice
reduced_slice_start = max(
0, len(recent_messages) - keep_count
)
reduced_recent = _ensure_tool_pairs_intact(
reduced_recent, recent_messages, reduced_slice_start
)
if has_system_prompt:
messages = [
system_msg,
@@ -1248,10 +961,7 @@ async def _stream_chat_chunks(
# Create a base list excluding system prompt to avoid duplication
# This is the pool of messages we'll slice from in the loop
# Use messages_dict for type consistency with _ensure_tool_pairs_intact
base_msgs = (
messages_dict[1:] if has_system_prompt else messages_dict
)
base_msgs = messages[1:] if has_system_prompt else messages
# Try progressively smaller keep counts
new_token_count = token_count # Initialize with current count
@@ -1274,12 +984,6 @@ async def _stream_chat_chunks(
# Slice from base_msgs to get recent messages (without system prompt)
recent_messages = base_msgs[-keep_count:]
# Ensure tool pairs stay intact in the reduced slice
reduced_slice_start = max(0, len(base_msgs) - keep_count)
recent_messages = _ensure_tool_pairs_intact(
recent_messages, base_msgs, reduced_slice_start
)
if has_system_prompt:
messages = [system_msg] + recent_messages
else:
@@ -1556,19 +1260,17 @@ async def _yield_tool_call(
"""
Yield a tool call and its execution result.
For tools marked with `is_long_running=True` (like agent generation), spawns a
background task so the operation survives SSE disconnections. For other tools,
yields heartbeat events every 15 seconds to keep the SSE connection alive.
For long-running tools, yields heartbeat events every 15 seconds to keep
the SSE connection alive through proxies and load balancers.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
import uuid as uuid_module
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - handle empty arguments gracefully
raw_arguments = tool_calls[yield_idx]["function"]["arguments"]
@@ -1583,151 +1285,7 @@ async def _yield_tool_call(
input=arguments,
)
# Check if this tool is long-running (survives SSE disconnection)
tool = get_tool(tool_name)
if tool and tool.is_long_running:
# Atomic check-and-set: returns False if operation already running (lost race)
if not await _mark_operation_started(tool_call_id):
logger.info(
f"Tool call {tool_call_id} already in progress, returning status"
)
# Build dynamic message based on tool name
if tool_name == "create_agent":
in_progress_msg = "Agent creation already in progress. Please wait..."
elif tool_name == "edit_agent":
in_progress_msg = "Agent edit already in progress. Please wait..."
else:
in_progress_msg = f"{tool_name} already in progress. Please wait..."
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationInProgressResponse(
message=in_progress_msg,
tool_call_id=tool_call_id,
).model_dump_json(),
success=True,
)
return
# Generate operation ID
operation_id = str(uuid_module.uuid4())
# Build a user-friendly message based on tool and arguments
if tool_name == "create_agent":
agent_desc = arguments.get("description", "")
# Truncate long descriptions for the message
desc_preview = (
(agent_desc[:100] + "...") if len(agent_desc) > 100 else agent_desc
)
pending_msg = (
f"Creating your agent: {desc_preview}"
if desc_preview
else "Creating agent... This may take a few minutes."
)
started_msg = (
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
)
elif tool_name == "edit_agent":
changes = arguments.get("changes", "")
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
pending_msg = (
f"Editing agent: {changes_preview}"
if changes_preview
else "Editing agent... This may take a few minutes."
)
started_msg = (
"Agent edit started. You can close this tab - "
"check your library in a few minutes."
)
else:
pending_msg = f"Running {tool_name}... This may take a few minutes."
started_msg = (
f"{tool_name} started. You can close this tab - "
"check back in a few minutes."
)
# Track appended messages for rollback on failure
assistant_message: ChatMessage | None = None
pending_message: ChatMessage | None = None
# Wrap session save and task creation in try-except to release lock on failure
try:
# Save assistant message with tool_call FIRST (required by LLM)
assistant_message = ChatMessage(
role="assistant",
content="",
tool_calls=[tool_calls[yield_idx]],
)
session.messages.append(assistant_message)
# Then save pending tool result
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} for tool {tool_name} "
f"in session {session.session_id}"
)
# Store task reference in module-level set to prevent GC before completion
task = asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
operation_id=operation_id,
session_id=session.session_id,
user_id=session.user_id,
)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
except Exception as e:
# Roll back appended messages to prevent data corruption on subsequent saves
if (
pending_message
and session.messages
and session.messages[-1] == pending_message
):
session.messages.pop()
if (
assistant_message
and session.messages
and session.messages[-1] == assistant_message
):
session.messages.pop()
# Release the Redis lock since the background task won't be spawned
await _mark_operation_completed(tool_call_id)
logger.error(
f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True
)
raise
# Return immediately - don't wait for completion
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationStartedResponse(
message=started_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
success=True,
)
return
# Normal flow: Run tool execution in background task with heartbeats
# Run tool execution in background task with heartbeats to keep connection alive
tool_task = asyncio.create_task(
execute_tool(
tool_name=tool_name,
@@ -1777,190 +1335,3 @@ async def _yield_tool_call(
)
yield tool_execution_response
async def _execute_long_running_tool(
tool_name: str,
parameters: dict[str, Any],
tool_call_id: str,
operation_id: str,
session_id: str,
user_id: str | None,
) -> None:
"""Execute a long-running tool in background and update chat history with result.
This function runs independently of the SSE connection, so the operation
survives if the user closes their browser tab.
"""
try:
# Load fresh session (not stale reference)
session = await get_chat_session(session_id, user_id)
if not session:
logger.error(f"Session {session_id} not found for background tool")
return
# Execute the actual tool
result = await execute_tool(
tool_name=tool_name,
parameters=parameters,
tool_call_id=tool_call_id,
user_id=user_id,
session=session,
)
# Update the pending message with result
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=(
result.output
if isinstance(result.output, str)
else orjson.dumps(result.output).decode("utf-8")
),
)
logger.info(f"Background tool {tool_name} completed for session {session_id}")
# Generate LLM continuation so user sees response when they poll/refresh
await _generate_llm_continuation(session_id=session_id, user_id=user_id)
except Exception as e:
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
error_response = ErrorResponse(
message=f"Tool {tool_name} failed: {str(e)}",
)
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=error_response.model_dump_json(),
)
finally:
await _mark_operation_completed(tool_call_id)
async def _update_pending_operation(
session_id: str,
tool_call_id: str,
result: str,
) -> None:
"""Update the pending tool message with final result.
This is called by background tasks when long-running operations complete.
"""
# Update the message in database
updated = await chat_db.update_tool_message_content(
session_id=session_id,
tool_call_id=tool_call_id,
new_content=result,
)
if updated:
# Invalidate Redis cache so next load gets fresh data
# Wrap in try/except to prevent cache failures from triggering error handling
# that would overwrite our successful DB update
try:
await invalidate_session_cache(session_id)
except Exception as e:
# Non-critical: cache will eventually be refreshed on next load
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
logger.info(
f"Updated pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)
else:
logger.warning(
f"Failed to update pending operation for tool_call_id {tool_call_id} "
f"in session {session_id}"
)
async def _generate_llm_continuation(
session_id: str,
user_id: str | None,
) -> None:
"""Generate an LLM response after a long-running tool completes.
This is called by background tasks to continue the conversation
after a tool result is saved. The response is saved to the database
so users see it when they refresh or poll.
"""
try:
# Load fresh session from DB (bypass cache to get the updated tool result)
await invalidate_session_cache(session_id)
session = await get_chat_session(session_id, user_id)
if not session:
logger.error(f"Session {session_id} not found for LLM continuation")
return
# Build system prompt
system_prompt, _ = await _build_system_prompt(user_id)
# Build messages in OpenAI format
messages = session.to_openai_messages()
if system_prompt:
from openai.types.chat import ChatCompletionSystemMessageParam
system_message = ChatCompletionSystemMessageParam(
role="system",
content=system_prompt,
)
messages = [system_message] + messages
# Build extra_body for tracing
extra_body: dict[str, Any] = {
"posthogProperties": {
"environment": settings.config.app_env.value,
},
}
if user_id:
extra_body["user"] = user_id[:128]
extra_body["posthogDistinctId"] = user_id
if session_id:
extra_body["session_id"] = session_id[:128]
# Make non-streaming LLM call (no tools - just text response)
from typing import cast
from openai.types.chat import ChatCompletionMessageParam
# No tools parameter = text-only response (no tool calls)
response = await client.chat.completions.create(
model=config.model,
messages=cast(list[ChatCompletionMessageParam], messages),
extra_body=extra_body,
)
if response.choices and response.choices[0].message.content:
assistant_content = response.choices[0].message.content
# Reload session from DB to avoid race condition with user messages
# that may have been sent while we were generating the LLM response
fresh_session = await get_chat_session(session_id, user_id)
if not fresh_session:
logger.error(
f"Session {session_id} disappeared during LLM continuation"
)
return
# Save assistant message to database
assistant_message = ChatMessage(
role="assistant",
content=assistant_content,
)
fresh_session.messages.append(assistant_message)
# Save to database (not cache) to persist the response
await upsert_chat_session(fresh_session)
# Invalidate cache so next poll/refresh gets fresh data
await invalidate_session_cache(session_id)
logger.info(
f"Generated LLM continuation for session {session_id}, "
f"response length: {len(assistant_content)}"
)
else:
logger.warning(f"LLM continuation returned empty response for {session_id}")
except Exception as e:
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)

View File

@@ -49,11 +49,6 @@ tools: list[ChatCompletionToolParam] = [
]
def get_tool(tool_name: str) -> BaseTool | None:
"""Get a tool instance by name."""
return TOOL_REGISTRY.get(tool_name)
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
@@ -62,7 +57,7 @@ async def execute_tool(
tool_call_id: str,
) -> "StreamToolOutputAvailable":
"""Execute a tool by name."""
tool = get_tool(tool_name)
tool = TOOL_REGISTRY.get(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")

View File

@@ -36,16 +36,6 @@ class BaseTool:
"""Whether this tool requires authentication."""
return False
@property
def is_long_running(self) -> bool:
"""Whether this tool is long-running and should execute in background.
Long-running tools (like agent generation) are executed via background
tasks to survive SSE disconnections. The result is persisted to chat
history and visible when the user refreshes.
"""
return False
def as_openai_tool(self) -> ChatCompletionToolParam:
"""Convert to OpenAI tool format."""
return ChatCompletionToolParam(

View File

@@ -42,10 +42,6 @@ class CreateAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -42,10 +42,6 @@ class EditAgentTool(BaseTool):
def requires_auth(self) -> bool:
return True
@property
def is_long_running(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {

View File

@@ -28,10 +28,6 @@ class ResponseType(str, Enum):
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
# Long-running operation types
OPERATION_STARTED = "operation_started"
OPERATION_PENDING = "operation_pending"
OPERATION_IN_PROGRESS = "operation_in_progress"
# Base response model
@@ -338,39 +334,3 @@ class BlockOutputResponse(ToolResponseBase):
block_name: str
outputs: dict[str, list[Any]]
success: bool = True
# Long-running operation models
class OperationStartedResponse(ToolResponseBase):
"""Response when a long-running operation has been started in the background.
This is returned immediately to the client while the operation continues
to execute. The user can close the tab and check back later.
"""
type: ResponseType = ResponseType.OPERATION_STARTED
operation_id: str
tool_name: str
class OperationPendingResponse(ToolResponseBase):
"""Response stored in chat history while a long-running operation is executing.
This is persisted to the database so users see a pending state when they
refresh before the operation completes.
"""
type: ResponseType = ResponseType.OPERATION_PENDING
operation_id: str
tool_name: str
class OperationInProgressResponse(ToolResponseBase):
"""Response when an operation is already in progress.
Returned for idempotency when the same tool_call_id is requested again
while the background task is still running.
"""
type: ResponseType = ResponseType.OPERATION_IN_PROGRESS
tool_call_id: str

View File

@@ -265,13 +265,9 @@ async def get_onboarding_agents(
"/onboarding/enabled",
summary="Is onboarding enabled",
tags=["onboarding", "public"],
dependencies=[Security(requires_user)],
)
async def is_onboarding_enabled(
user_id: Annotated[str, Security(get_user_id)],
) -> bool:
# If chat is enabled for user, skip legacy onboarding
if await is_feature_enabled(Flag.CHAT, user_id, False):
return False
async def is_onboarding_enabled() -> bool:
return await onboarding_enabled()

View File

@@ -26,6 +26,31 @@ def add_param(url: str, key: str, value: str) -> str:
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
# Extract the application schema from DATABASE_URL for use in queries
_parsed = urlparse(DATABASE_URL)
_query_params = dict(parse_qsl(_parsed.query))
_app_schema = _query_params.get("schema", "public")
# Build search_path that includes app schema and extension schemas where pgvector may live.
# This is used both in connection options (may be ignored by PgBouncer) and in SET LOCAL
# statements before raw queries (guaranteed to work).
SEARCH_PATH = (
f"{_app_schema},extensions,public"
if _app_schema != "public"
else "public,extensions"
)
# Try to set search_path via PostgreSQL options parameter at connection time.
# NOTE: This may be ignored by PgBouncer in transaction pooling mode.
# As a fallback, we also SET LOCAL search_path before raw queries.
if "options" in _query_params:
_query_params["options"] = (
_query_params["options"] + f" -c search_path={SEARCH_PATH}"
)
else:
_query_params["options"] = f"-c search_path={SEARCH_PATH}"
DATABASE_URL = urlunparse(_parsed._replace(query=urlencode(_query_params)))
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
if CONN_LIMIT:
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
@@ -108,6 +133,70 @@ def get_database_schema() -> str:
return query_params.get("schema", "public")
def get_pod_info() -> dict:
"""Get information about the current pod/host.
Returns dict with: hostname, pod_name (from HOSTNAME env var in k8s),
pod_namespace, pod_ip if available.
"""
import socket
return {
"hostname": socket.gethostname(),
"pod_name": os.getenv("HOSTNAME", "unknown"),
"pod_namespace": os.getenv("POD_NAMESPACE", "unknown"),
"pod_ip": os.getenv("POD_IP", "unknown"),
}
async def get_connection_debug_info(tx=None) -> dict:
"""Get diagnostic info about the current database connection and pod.
Useful for debugging "table does not exist" or "type does not exist" errors
that may indicate connections going to different database instances or pods.
Args:
tx: Optional transaction client to use for the query (ensures same connection)
Returns dict with: search_path, current_schema, server_version, pg_backend_pid,
pgvector_installed, pgvector_schema, plus pod info
"""
import prisma as prisma_module
pod_info = get_pod_info()
db_client = tx if tx else prisma_module.get_client()
try:
# Get connection info and check for pgvector in a single query
result = await db_client.query_raw(
"""
SELECT
current_setting('search_path') as search_path,
current_schema() as current_schema,
current_database() as current_database,
inet_server_addr() as server_addr,
inet_server_port() as server_port,
pg_backend_pid() as backend_pid,
version() as server_version,
(SELECT EXISTS(
SELECT 1 FROM pg_extension WHERE extname = 'vector'
)) as pgvector_installed,
(SELECT nspname FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
WHERE e.extname = 'vector'
LIMIT 1) as pgvector_schema,
(SELECT string_agg(extname || ' in ' || nspname, ', ')
FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
) as all_extensions
"""
)
db_info = result[0] if result else {}
return {**pod_info, **db_info}
except Exception as e:
return {**pod_info, "db_error": str(e)}
async def _raw_with_schema(
query_template: str,
*args,
@@ -124,8 +213,9 @@ async def _raw_with_schema(
Note on pgvector types:
Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves
these via search_path, which includes the schema where pgvector is installed
on all environments (local, CI, dev).
these via search_path. The connection's search_path is configured at module
load to include common extension schemas (public, extensions) where pgvector
may be installed across different environments (local, CI, Supabase).
Args:
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
@@ -155,12 +245,60 @@ async def _raw_with_schema(
db_client = client if client else prisma_module.get_client()
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
# For queries that might use pgvector types (::vector or <=> operator),
# we need to ensure search_path includes the schema where pgvector is installed.
# PgBouncer in transaction mode may ignore connection-level options, so we
# use SET LOCAL within a transaction to guarantee correct search_path.
needs_vector_search_path = "::vector" in formatted_query or "<=>" in formatted_query
return result
try:
if needs_vector_search_path and client is None:
# Use transaction to set search_path for vector queries
async with db_client.tx() as tx:
# Log debug info BEFORE the query to capture which backend we're hitting
debug_info = await get_connection_debug_info(tx)
logger.info(
f"Vector query starting. backend_pid={debug_info.get('backend_pid')}, "
f"server_addr={debug_info.get('server_addr')}, "
f"pgvector_installed={debug_info.get('pgvector_installed')}, "
f"pgvector_schema={debug_info.get('pgvector_schema')}, "
f"search_path={debug_info.get('search_path')}, "
f"pod={debug_info.get('pod_name')}"
)
await tx.execute_raw(f"SET LOCAL search_path TO {SEARCH_PATH}")
if execute:
result = await tx.execute_raw(formatted_query, *args) # type: ignore
else:
result = await tx.query_raw(formatted_query, *args) # type: ignore
logger.info(
f"Vector query SUCCESS. backend_pid={debug_info.get('backend_pid')}"
)
else:
# Regular query without vector types, or already in a transaction
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
return result
except Exception as e:
error_msg = str(e)
# Log connection debug info for "does not exist" errors to help diagnose
# whether connections are going to different database instances
if "does not exist" in error_msg:
try:
debug_info = await get_connection_debug_info()
logger.error(
f"Vector query FAILED. Connection debug info: {debug_info}. "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
except Exception:
logger.error(
f"Vector query FAILED (debug info unavailable). "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
raise
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:

View File

@@ -41,7 +41,6 @@ FrontendOnboardingStep = Literal[
OnboardingStep.AGENT_NEW_RUN,
OnboardingStep.AGENT_INPUT,
OnboardingStep.CONGRATS,
OnboardingStep.VISIT_COPILOT,
OnboardingStep.MARKETPLACE_VISIT,
OnboardingStep.BUILDER_OPEN,
]
@@ -123,9 +122,6 @@ async def update_user_onboarding(user_id: str, data: UserOnboardingUpdate):
async def _reward_user(user_id: str, onboarding: UserOnboarding, step: OnboardingStep):
reward = 0
match step:
# Welcome bonus for visiting copilot ($5 = 500 credits)
case OnboardingStep.VISIT_COPILOT:
reward = 500
# Reward user when they clicked New Run during onboarding
# This is because they need credits before scheduling a run (next step)
# This is seen as a reward for the GET_RESULTS step in the wallet

View File

@@ -216,7 +216,27 @@ async def get_business_understanding(
# Cache miss - load from database
logger.debug(f"Business understanding cache miss for user {user_id}")
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
try:
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
except Exception as e:
error_msg = str(e)
if "does not exist" in error_msg:
# Log connection debug info to diagnose if connections go to different DBs
from backend.data.db import get_connection_debug_info
try:
debug_info = await get_connection_debug_info()
logger.error(
f"CoPilotUnderstanding table not found. Connection debug: {debug_info}. "
f"Error: {error_msg}"
)
except Exception:
logger.error(
f"CoPilotUnderstanding table not found (debug unavailable). "
f"Error: {error_msg}"
)
raise
if record is None:
return None

View File

@@ -359,8 +359,8 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The port for the Agent Generator service",
)
agentgenerator_timeout: int = Field(
default=600,
description="The timeout in seconds for Agent Generator service requests (includes retries for rate limits)",
default=120,
description="The timeout in seconds for Agent Generator service requests",
)
enable_example_blocks: bool = Field(

View File

@@ -1,2 +0,0 @@
-- AlterEnum
ALTER TYPE "OnboardingStep" ADD VALUE 'VISIT_COPILOT';

View File

@@ -81,7 +81,6 @@ enum OnboardingStep {
AGENT_INPUT
CONGRATS
// First Wins
VISIT_COPILOT
GET_RESULTS
MARKETPLACE_VISIT
MARKETPLACE_ADD_AGENT

View File

@@ -34,6 +34,3 @@ NEXT_PUBLIC_PREVIEW_STEALING_DEV=
# PostHog Analytics
NEXT_PUBLIC_POSTHOG_KEY=
NEXT_PUBLIC_POSTHOG_HOST=https://eu.i.posthog.com
# OpenAI (for voice transcription)
OPENAI_API_KEY=

View File

@@ -1,10 +1,12 @@
"use client";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
import { Text } from "@/components/atoms/Text/Text";
import { NAVBAR_HEIGHT_PX } from "@/lib/constants";
import type { ReactNode } from "react";
import { useEffect } from "react";
import { useCopilotStore } from "../../copilot-page-store";
import { DesktopSidebar } from "./components/DesktopSidebar/DesktopSidebar";
import { LoadingState } from "./components/LoadingState/LoadingState";
import { MobileDrawer } from "./components/MobileDrawer/MobileDrawer";
import { MobileHeader } from "./components/MobileHeader/MobileHeader";
import { useCopilotShell } from "./useCopilotShell";
@@ -18,21 +20,38 @@ export function CopilotShell({ children }: Props) {
isMobile,
isDrawerOpen,
isLoading,
isCreatingSession,
isLoggedIn,
hasActiveSession,
sessions,
currentSessionId,
handleSelectSession,
handleOpenDrawer,
handleCloseDrawer,
handleDrawerOpenChange,
handleNewChatClick,
handleSessionClick,
handleNewChat,
hasNextPage,
isFetchingNextPage,
fetchNextPage,
isReadyToShowContent,
} = useCopilotShell();
const setNewChatHandler = useCopilotStore((s) => s.setNewChatHandler);
const requestNewChat = useCopilotStore((s) => s.requestNewChat);
useEffect(
function registerNewChatHandler() {
setNewChatHandler(handleNewChat);
return function cleanup() {
setNewChatHandler(null);
};
},
[handleNewChat],
);
function handleNewChatClick() {
requestNewChat();
}
if (!isLoggedIn) {
return (
<div className="flex h-full items-center justify-center">
@@ -53,7 +72,7 @@ export function CopilotShell({ children }: Props) {
isLoading={isLoading}
hasNextPage={hasNextPage}
isFetchingNextPage={isFetchingNextPage}
onSelectSession={handleSessionClick}
onSelectSession={handleSelectSession}
onFetchNextPage={fetchNextPage}
onNewChat={handleNewChatClick}
hasActiveSession={Boolean(hasActiveSession)}
@@ -63,18 +82,7 @@ export function CopilotShell({ children }: Props) {
<div className="relative flex min-h-0 flex-1 flex-col">
{isMobile && <MobileHeader onOpenDrawer={handleOpenDrawer} />}
<div className="flex min-h-0 flex-1 flex-col">
{isCreatingSession ? (
<div className="flex h-full flex-1 flex-col items-center justify-center bg-[#f8f8f9]">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Creating your chat...
</Text>
</div>
</div>
) : (
children
)}
{isReadyToShowContent ? children : <LoadingState />}
</div>
</div>
@@ -86,7 +94,7 @@ export function CopilotShell({ children }: Props) {
isLoading={isLoading}
hasNextPage={hasNextPage}
isFetchingNextPage={isFetchingNextPage}
onSelectSession={handleSessionClick}
onSelectSession={handleSelectSession}
onFetchNextPage={fetchNextPage}
onNewChat={handleNewChatClick}
onClose={handleCloseDrawer}

View File

@@ -0,0 +1,15 @@
import { Text } from "@/components/atoms/Text/Text";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
export function LoadingState() {
return (
<div className="flex flex-1 items-center justify-center">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Loading your chats...
</Text>
</div>
</div>
);
}

View File

@@ -3,17 +3,17 @@ import { useState } from "react";
export function useMobileDrawer() {
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
const handleOpenDrawer = () => {
function handleOpenDrawer() {
setIsDrawerOpen(true);
};
}
const handleCloseDrawer = () => {
function handleCloseDrawer() {
setIsDrawerOpen(false);
};
}
const handleDrawerOpenChange = (open: boolean) => {
function handleDrawerOpenChange(open: boolean) {
setIsDrawerOpen(open);
};
}
return {
isDrawerOpen,

View File

@@ -1,6 +1,11 @@
import { useGetV2ListSessions } from "@/app/api/__generated__/endpoints/chat/chat";
import {
getGetV2ListSessionsQueryKey,
useGetV2ListSessions,
} from "@/app/api/__generated__/endpoints/chat/chat";
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { okData } from "@/app/api/helpers";
import { useChatStore } from "@/components/contextual/Chat/chat-store";
import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useState } from "react";
const PAGE_SIZE = 50;
@@ -11,12 +16,12 @@ export interface UseSessionsPaginationArgs {
export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
const [offset, setOffset] = useState(0);
const [accumulatedSessions, setAccumulatedSessions] = useState<
SessionSummaryResponse[]
>([]);
const [totalCount, setTotalCount] = useState<number | null>(null);
const queryClient = useQueryClient();
const onStreamComplete = useChatStore((state) => state.onStreamComplete);
const { data, isLoading, isFetching, isError } = useGetV2ListSessions(
{ limit: PAGE_SIZE, offset },
@@ -27,23 +32,38 @@ export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
},
);
useEffect(() => {
const responseData = okData(data);
if (responseData) {
const newSessions = responseData.sessions;
const total = responseData.total;
setTotalCount(total);
if (offset === 0) {
setAccumulatedSessions(newSessions);
} else {
setAccumulatedSessions((prev) => [...prev, ...newSessions]);
}
} else if (!enabled) {
useEffect(function refreshOnStreamComplete() {
const unsubscribe = onStreamComplete(function handleStreamComplete() {
setOffset(0);
setAccumulatedSessions([]);
setTotalCount(null);
}
}, [data, offset, enabled]);
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
});
return unsubscribe;
}, []);
useEffect(
function updateSessionsFromResponse() {
const responseData = okData(data);
if (responseData) {
const newSessions = responseData.sessions;
const total = responseData.total;
setTotalCount(total);
if (offset === 0) {
setAccumulatedSessions(newSessions);
} else {
setAccumulatedSessions((prev) => [...prev, ...newSessions]);
}
} else if (!enabled) {
setAccumulatedSessions([]);
setTotalCount(null);
}
},
[data, offset, enabled],
);
const hasNextPage =
totalCount !== null && accumulatedSessions.length < totalCount;
@@ -66,17 +86,17 @@ export function useSessionsPagination({ enabled }: UseSessionsPaginationArgs) {
}
}, [hasNextPage, isFetching, isLoading, isError, totalCount]);
const fetchNextPage = () => {
function fetchNextPage() {
if (hasNextPage && !isFetching) {
setOffset((prev) => prev + PAGE_SIZE);
}
};
}
const reset = () => {
// Only reset the offset - keep existing sessions visible during refetch
// The effect will replace sessions when new data arrives at offset 0
function reset() {
setOffset(0);
};
setAccumulatedSessions([]);
setTotalCount(null);
}
return {
sessions: accumulatedSessions,

View File

@@ -104,3 +104,76 @@ export function mergeCurrentSessionIntoList(
export function getCurrentSessionId(searchParams: URLSearchParams) {
return searchParams.get("sessionId");
}
export function shouldAutoSelectSession(
areAllSessionsLoaded: boolean,
hasAutoSelectedSession: boolean,
paramSessionId: string | null,
visibleSessions: SessionSummaryResponse[],
accumulatedSessions: SessionSummaryResponse[],
isLoading: boolean,
totalCount: number | null,
) {
if (!areAllSessionsLoaded || hasAutoSelectedSession) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
if (paramSessionId) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
if (visibleSessions.length > 0) {
return {
shouldSelect: true,
sessionIdToSelect: visibleSessions[0].id,
shouldCreate: false,
};
}
if (accumulatedSessions.length === 0 && !isLoading && totalCount === 0) {
return { shouldSelect: false, sessionIdToSelect: null, shouldCreate: true };
}
if (totalCount === 0) {
return {
shouldSelect: false,
sessionIdToSelect: null,
shouldCreate: false,
};
}
return { shouldSelect: false, sessionIdToSelect: null, shouldCreate: false };
}
export function checkReadyToShowContent(
areAllSessionsLoaded: boolean,
paramSessionId: string | null,
accumulatedSessions: SessionSummaryResponse[],
isCurrentSessionLoading: boolean,
currentSessionData: SessionDetailResponse | null | undefined,
hasAutoSelectedSession: boolean,
) {
if (!areAllSessionsLoaded) return false;
if (paramSessionId) {
const sessionFound = accumulatedSessions.some(
(s) => s.id === paramSessionId,
);
return (
sessionFound ||
(!isCurrentSessionLoading &&
currentSessionData !== undefined &&
currentSessionData !== null)
);
}
return hasAutoSelectedSession;
}

View File

@@ -1,22 +1,26 @@
"use client";
import {
getGetV2GetSessionQueryKey,
getGetV2ListSessionsQueryKey,
useGetV2GetSession,
} from "@/app/api/__generated__/endpoints/chat/chat";
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { okData } from "@/app/api/helpers";
import { useChatStore } from "@/components/contextual/Chat/chat-store";
import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useQueryClient } from "@tanstack/react-query";
import { parseAsString, useQueryState } from "nuqs";
import { usePathname, useSearchParams } from "next/navigation";
import { useRef } from "react";
import { useCopilotStore } from "../../copilot-page-store";
import { useCopilotSessionId } from "../../useCopilotSessionId";
import { useEffect, useRef, useState } from "react";
import { useMobileDrawer } from "./components/MobileDrawer/useMobileDrawer";
import { getCurrentSessionId } from "./helpers";
import { useShellSessionList } from "./useShellSessionList";
import { useSessionsPagination } from "./components/SessionsList/useSessionsPagination";
import {
checkReadyToShowContent,
convertSessionDetailToSummary,
filterVisibleSessions,
getCurrentSessionId,
mergeCurrentSessionIntoList,
} from "./helpers";
export function useCopilotShell() {
const pathname = usePathname();
@@ -27,7 +31,7 @@ export function useCopilotShell() {
const isMobile =
breakpoint === "base" || breakpoint === "sm" || breakpoint === "md";
const { urlSessionId, setUrlSessionId } = useCopilotSessionId();
const [, setUrlSessionId] = useQueryState("sessionId", parseAsString);
const isOnHomepage = pathname === "/copilot";
const paramSessionId = searchParams.get("sessionId");
@@ -41,80 +45,123 @@ export function useCopilotShell() {
const paginationEnabled = !isMobile || isDrawerOpen || !!paramSessionId;
const {
sessions: accumulatedSessions,
isLoading: isSessionsLoading,
isFetching: isSessionsFetching,
hasNextPage,
areAllSessionsLoaded,
fetchNextPage,
reset: resetPagination,
} = useSessionsPagination({
enabled: paginationEnabled,
});
const currentSessionId = getCurrentSessionId(searchParams);
const { data: currentSessionData } = useGetV2GetSession(
currentSessionId || "",
{
const { data: currentSessionData, isLoading: isCurrentSessionLoading } =
useGetV2GetSession(currentSessionId || "", {
query: {
enabled: !!currentSessionId,
select: okData,
},
},
);
const {
sessions,
isLoading,
isSessionsFetching,
hasNextPage,
fetchNextPage,
resetPagination,
recentlyCreatedSessionsRef,
} = useShellSessionList({
paginationEnabled,
currentSessionId,
currentSessionData,
isOnHomepage,
paramSessionId,
});
const stopStream = useChatStore((s) => s.stopStream);
const onStreamComplete = useChatStore((s) => s.onStreamComplete);
const isStreaming = useCopilotStore((s) => s.isStreaming);
const isCreatingSession = useCopilotStore((s) => s.isCreatingSession);
const setIsSwitchingSession = useCopilotStore((s) => s.setIsSwitchingSession);
const openInterruptModal = useCopilotStore((s) => s.openInterruptModal);
const pendingActionRef = useRef<(() => void) | null>(null);
async function stopCurrentStream() {
if (!currentSessionId) return;
setIsSwitchingSession(true);
await new Promise<void>((resolve) => {
const unsubscribe = onStreamComplete((completedId) => {
if (completedId === currentSessionId) {
clearTimeout(timeout);
unsubscribe();
resolve();
}
});
const timeout = setTimeout(() => {
unsubscribe();
resolve();
}, 3000);
stopStream(currentSessionId);
});
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(currentSessionId),
});
setIsSwitchingSession(false);
}
const [hasAutoSelectedSession, setHasAutoSelectedSession] = useState(false);
const hasAutoSelectedRef = useRef(false);
const recentlyCreatedSessionsRef = useRef<
Map<string, SessionSummaryResponse>
>(new Map());
function selectSession(sessionId: string) {
if (sessionId === currentSessionId) return;
if (recentlyCreatedSessionsRef.current.has(sessionId)) {
// Mark as auto-selected when sessionId is in URL
useEffect(() => {
if (paramSessionId && !hasAutoSelectedRef.current) {
hasAutoSelectedRef.current = true;
setHasAutoSelectedSession(true);
}
}, [paramSessionId]);
// On homepage without sessionId, mark as ready immediately
useEffect(() => {
if (isOnHomepage && !paramSessionId && !hasAutoSelectedRef.current) {
hasAutoSelectedRef.current = true;
setHasAutoSelectedSession(true);
}
}, [isOnHomepage, paramSessionId]);
// Invalidate sessions list when navigating to homepage (to show newly created sessions)
useEffect(() => {
if (isOnHomepage && !paramSessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
queryKey: getGetV2ListSessionsQueryKey(),
});
}
}, [isOnHomepage, paramSessionId, queryClient]);
// Track newly created sessions to ensure they stay visible even when switching away
useEffect(() => {
if (currentSessionId && currentSessionData) {
const isNewSession =
currentSessionData.updated_at === currentSessionData.created_at;
const isNotInAccumulated = !accumulatedSessions.some(
(s) => s.id === currentSessionId,
);
if (isNewSession || isNotInAccumulated) {
const summary = convertSessionDetailToSummary(currentSessionData);
recentlyCreatedSessionsRef.current.set(currentSessionId, summary);
}
}
}, [currentSessionId, currentSessionData, accumulatedSessions]);
// Clean up recently created sessions that are now in the accumulated list
useEffect(() => {
for (const sessionId of recentlyCreatedSessionsRef.current.keys()) {
if (accumulatedSessions.some((s) => s.id === sessionId)) {
recentlyCreatedSessionsRef.current.delete(sessionId);
}
}
}, [accumulatedSessions]);
// Reset pagination when query becomes disabled
const prevPaginationEnabledRef = useRef(paginationEnabled);
useEffect(() => {
if (prevPaginationEnabledRef.current && !paginationEnabled) {
resetPagination();
resetAutoSelect();
}
prevPaginationEnabledRef.current = paginationEnabled;
}, [paginationEnabled, resetPagination]);
const sessions = mergeCurrentSessionIntoList(
accumulatedSessions,
currentSessionId,
currentSessionData,
recentlyCreatedSessionsRef.current,
);
const visibleSessions = filterVisibleSessions(sessions);
const sidebarSelectedSessionId =
isOnHomepage && !paramSessionId ? null : currentSessionId;
const isReadyToShowContent = isOnHomepage
? true
: checkReadyToShowContent(
areAllSessionsLoaded,
paramSessionId,
accumulatedSessions,
isCurrentSessionLoading,
currentSessionData,
hasAutoSelectedSession,
);
function handleSelectSession(sessionId: string) {
setUrlSessionId(sessionId, { shallow: false });
if (isMobile) handleCloseDrawer();
}
function startNewChat() {
function handleNewChat() {
resetAutoSelect();
resetPagination();
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
@@ -123,31 +170,12 @@ export function useCopilotShell() {
if (isMobile) handleCloseDrawer();
}
function handleSessionClick(sessionId: string) {
if (sessionId === currentSessionId) return;
if (isStreaming) {
pendingActionRef.current = async () => {
await stopCurrentStream();
selectSession(sessionId);
};
openInterruptModal(pendingActionRef.current);
} else {
selectSession(sessionId);
}
function resetAutoSelect() {
hasAutoSelectedRef.current = false;
setHasAutoSelectedSession(false);
}
function handleNewChatClick() {
if (isStreaming) {
pendingActionRef.current = async () => {
await stopCurrentStream();
startNewChat();
};
openInterruptModal(pendingActionRef.current);
} else {
startNewChat();
}
}
const isLoading = isSessionsLoading && accumulatedSessions.length === 0;
return {
isMobile,
@@ -155,17 +183,17 @@ export function useCopilotShell() {
isLoggedIn,
hasActiveSession:
Boolean(currentSessionId) && (!isOnHomepage || Boolean(paramSessionId)),
isLoading: isLoading || isCreatingSession,
isCreatingSession,
sessions,
currentSessionId: urlSessionId,
isLoading,
sessions: visibleSessions,
currentSessionId: sidebarSelectedSessionId,
handleSelectSession,
handleOpenDrawer,
handleCloseDrawer,
handleDrawerOpenChange,
handleNewChatClick,
handleSessionClick,
handleNewChat,
hasNextPage,
isFetchingNextPage: isSessionsFetching,
fetchNextPage,
isReadyToShowContent,
};
}

View File

@@ -1,113 +0,0 @@
import { getGetV2ListSessionsQueryKey } from "@/app/api/__generated__/endpoints/chat/chat";
import type { SessionDetailResponse } from "@/app/api/__generated__/models/sessionDetailResponse";
import type { SessionSummaryResponse } from "@/app/api/__generated__/models/sessionSummaryResponse";
import { useChatStore } from "@/components/contextual/Chat/chat-store";
import { useQueryClient } from "@tanstack/react-query";
import { useEffect, useMemo, useRef } from "react";
import { useSessionsPagination } from "./components/SessionsList/useSessionsPagination";
import {
convertSessionDetailToSummary,
filterVisibleSessions,
mergeCurrentSessionIntoList,
} from "./helpers";
interface UseShellSessionListArgs {
paginationEnabled: boolean;
currentSessionId: string | null;
currentSessionData: SessionDetailResponse | null | undefined;
isOnHomepage: boolean;
paramSessionId: string | null;
}
export function useShellSessionList({
paginationEnabled,
currentSessionId,
currentSessionData,
isOnHomepage,
paramSessionId,
}: UseShellSessionListArgs) {
const queryClient = useQueryClient();
const onStreamComplete = useChatStore((s) => s.onStreamComplete);
const {
sessions: accumulatedSessions,
isLoading: isSessionsLoading,
isFetching: isSessionsFetching,
hasNextPage,
fetchNextPage,
reset: resetPagination,
} = useSessionsPagination({
enabled: paginationEnabled,
});
const recentlyCreatedSessionsRef = useRef<
Map<string, SessionSummaryResponse>
>(new Map());
useEffect(() => {
if (isOnHomepage && !paramSessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
}
}, [isOnHomepage, paramSessionId, queryClient]);
useEffect(() => {
if (currentSessionId && currentSessionData) {
const isNewSession =
currentSessionData.updated_at === currentSessionData.created_at;
const isNotInAccumulated = !accumulatedSessions.some(
(s) => s.id === currentSessionId,
);
if (isNewSession || isNotInAccumulated) {
const summary = convertSessionDetailToSummary(currentSessionData);
recentlyCreatedSessionsRef.current.set(currentSessionId, summary);
}
}
}, [currentSessionId, currentSessionData, accumulatedSessions]);
useEffect(() => {
for (const sessionId of recentlyCreatedSessionsRef.current.keys()) {
if (accumulatedSessions.some((s) => s.id === sessionId)) {
recentlyCreatedSessionsRef.current.delete(sessionId);
}
}
}, [accumulatedSessions]);
useEffect(() => {
const unsubscribe = onStreamComplete(() => {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
});
return unsubscribe;
}, [onStreamComplete, queryClient]);
const sessions = useMemo(
() =>
mergeCurrentSessionIntoList(
accumulatedSessions,
currentSessionId,
currentSessionData,
recentlyCreatedSessionsRef.current,
),
[accumulatedSessions, currentSessionId, currentSessionData],
);
const visibleSessions = useMemo(
() => filterVisibleSessions(sessions),
[sessions],
);
const isLoading = isSessionsLoading && accumulatedSessions.length === 0;
return {
sessions: visibleSessions,
isLoading,
isSessionsFetching,
hasNextPage,
fetchNextPage,
resetPagination,
recentlyCreatedSessionsRef,
};
}

View File

@@ -4,53 +4,51 @@ import { create } from "zustand";
interface CopilotStoreState {
isStreaming: boolean;
isSwitchingSession: boolean;
isCreatingSession: boolean;
isInterruptModalOpen: boolean;
pendingAction: (() => void) | null;
isNewChatModalOpen: boolean;
newChatHandler: (() => void) | null;
}
interface CopilotStoreActions {
setIsStreaming: (isStreaming: boolean) => void;
setIsSwitchingSession: (isSwitchingSession: boolean) => void;
setIsCreatingSession: (isCreating: boolean) => void;
openInterruptModal: (onConfirm: () => void) => void;
confirmInterrupt: () => void;
cancelInterrupt: () => void;
setNewChatHandler: (handler: (() => void) | null) => void;
requestNewChat: () => void;
confirmNewChat: () => void;
cancelNewChat: () => void;
}
type CopilotStore = CopilotStoreState & CopilotStoreActions;
export const useCopilotStore = create<CopilotStore>((set, get) => ({
isStreaming: false,
isSwitchingSession: false,
isCreatingSession: false,
isInterruptModalOpen: false,
pendingAction: null,
isNewChatModalOpen: false,
newChatHandler: null,
setIsStreaming(isStreaming) {
set({ isStreaming });
},
setIsSwitchingSession(isSwitchingSession) {
set({ isSwitchingSession });
setNewChatHandler(handler) {
set({ newChatHandler: handler });
},
setIsCreatingSession(isCreatingSession) {
set({ isCreatingSession });
requestNewChat() {
const { isStreaming, newChatHandler } = get();
if (isStreaming) {
set({ isNewChatModalOpen: true });
} else if (newChatHandler) {
newChatHandler();
}
},
openInterruptModal(onConfirm) {
set({ isInterruptModalOpen: true, pendingAction: onConfirm });
confirmNewChat() {
const { newChatHandler } = get();
set({ isNewChatModalOpen: false });
if (newChatHandler) {
newChatHandler();
}
},
confirmInterrupt() {
const { pendingAction } = get();
set({ isInterruptModalOpen: false, pendingAction: null });
if (pendingAction) pendingAction();
},
cancelInterrupt() {
set({ isInterruptModalOpen: false, pendingAction: null });
cancelNewChat() {
set({ isNewChatModalOpen: false });
},
}));

View File

@@ -1,5 +1,28 @@
import type { User } from "@supabase/supabase-js";
export type PageState =
| { type: "welcome" }
| { type: "newChat" }
| { type: "creating"; prompt: string }
| { type: "chat"; sessionId: string; initialPrompt?: string };
export function getInitialPromptFromState(
pageState: PageState,
storedInitialPrompt: string | undefined,
) {
if (storedInitialPrompt) return storedInitialPrompt;
if (pageState.type === "creating") return pageState.prompt;
if (pageState.type === "chat") return pageState.initialPrompt;
}
export function shouldResetToWelcome(pageState: PageState) {
return (
pageState.type !== "newChat" &&
pageState.type !== "creating" &&
pageState.type !== "welcome"
);
}
export function getGreetingName(user?: User | null): string {
if (!user) return "there";
const metadata = user.user_metadata as Record<string, unknown> | undefined;

View File

@@ -1,25 +1,25 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Skeleton } from "@/components/atoms/Skeleton/Skeleton";
import { Text } from "@/components/atoms/Text/Text";
import { Chat } from "@/components/contextual/Chat/Chat";
import { ChatInput } from "@/components/contextual/Chat/components/ChatInput/ChatInput";
import { ChatLoader } from "@/components/contextual/Chat/components/ChatLoader/ChatLoader";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useCopilotStore } from "./copilot-page-store";
import { useCopilotPage } from "./useCopilotPage";
export default function CopilotPage() {
const { state, handlers } = useCopilotPage();
const isInterruptModalOpen = useCopilotStore((s) => s.isInterruptModalOpen);
const confirmInterrupt = useCopilotStore((s) => s.confirmInterrupt);
const cancelInterrupt = useCopilotStore((s) => s.cancelInterrupt);
const confirmNewChat = useCopilotStore((s) => s.confirmNewChat);
const {
greetingName,
quickActions,
isLoading,
hasSession,
initialPrompt,
pageState,
isNewChatModalOpen,
isReady,
} = state;
const {
@@ -27,16 +27,20 @@ export default function CopilotPage() {
startChatWithPrompt,
handleSessionNotFound,
handleStreamingChange,
handleCancelNewChat,
handleNewChatModalOpen,
} = handlers;
if (!isReady) return null;
if (hasSession) {
if (pageState.type === "chat") {
return (
<div className="flex h-full flex-col">
<Chat
key={pageState.sessionId ?? "welcome"}
className="flex-1"
initialPrompt={initialPrompt}
urlSessionId={pageState.sessionId}
initialPrompt={pageState.initialPrompt}
onSessionNotFound={handleSessionNotFound}
onStreamingChange={handleStreamingChange}
/>
@@ -44,33 +48,31 @@ export default function CopilotPage() {
title="Interrupt current chat?"
styling={{ maxWidth: 300, width: "100%" }}
controlled={{
isOpen: isInterruptModalOpen,
set: (open) => {
if (!open) cancelInterrupt();
},
isOpen: isNewChatModalOpen,
set: handleNewChatModalOpen,
}}
onClose={cancelInterrupt}
onClose={handleCancelNewChat}
>
<Dialog.Content>
<div className="flex flex-col gap-4">
<Text variant="body">
The current chat response will be interrupted. Are you sure you
want to continue?
want to start a new chat?
</Text>
<Dialog.Footer>
<Button
type="button"
variant="outline"
onClick={cancelInterrupt}
onClick={handleCancelNewChat}
>
Cancel
</Button>
<Button
type="button"
variant="primary"
onClick={confirmInterrupt}
onClick={confirmNewChat}
>
Continue
Start new chat
</Button>
</Dialog.Footer>
</div>
@@ -80,6 +82,19 @@ export default function CopilotPage() {
);
}
if (pageState.type === "newChat" || pageState.type === "creating") {
return (
<div className="flex h-full flex-1 flex-col items-center justify-center bg-[#f8f8f9]">
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
Loading your chats...
</Text>
</div>
</div>
);
}
return (
<div className="flex h-full flex-1 items-center justify-center overflow-y-auto bg-[#f8f8f9] px-6 py-10">
<div className="w-full text-center">

View File

@@ -5,40 +5,79 @@ import {
import { useToast } from "@/components/molecules/Toast/use-toast";
import { getHomepageRoute } from "@/lib/constants";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useOnboarding } from "@/providers/onboarding/onboarding-provider";
import {
Flag,
type FlagValues,
useGetFlag,
} from "@/services/feature-flags/use-get-flag";
import { SessionKey, sessionStorage } from "@/services/storage/session-storage";
import * as Sentry from "@sentry/nextjs";
import { useQueryClient } from "@tanstack/react-query";
import { useFlags } from "launchdarkly-react-client-sdk";
import { useRouter } from "next/navigation";
import { useEffect } from "react";
import { useEffect, useReducer } from "react";
import { useCopilotStore } from "./copilot-page-store";
import { getGreetingName, getQuickActions } from "./helpers";
import { useCopilotSessionId } from "./useCopilotSessionId";
import { getGreetingName, getQuickActions, type PageState } from "./helpers";
import { useCopilotURLState } from "./useCopilotURLState";
type CopilotState = {
pageState: PageState;
initialPrompts: Record<string, string>;
previousSessionId: string | null;
};
type CopilotAction =
| { type: "setPageState"; pageState: PageState }
| { type: "setInitialPrompt"; sessionId: string; prompt: string }
| { type: "setPreviousSessionId"; sessionId: string | null };
function isSamePageState(next: PageState, current: PageState) {
if (next.type !== current.type) return false;
if (next.type === "creating" && current.type === "creating") {
return next.prompt === current.prompt;
}
if (next.type === "chat" && current.type === "chat") {
return (
next.sessionId === current.sessionId &&
next.initialPrompt === current.initialPrompt
);
}
return true;
}
function copilotReducer(
state: CopilotState,
action: CopilotAction,
): CopilotState {
if (action.type === "setPageState") {
if (isSamePageState(action.pageState, state.pageState)) return state;
return { ...state, pageState: action.pageState };
}
if (action.type === "setInitialPrompt") {
if (state.initialPrompts[action.sessionId] === action.prompt) return state;
return {
...state,
initialPrompts: {
...state.initialPrompts,
[action.sessionId]: action.prompt,
},
};
}
if (action.type === "setPreviousSessionId") {
if (state.previousSessionId === action.sessionId) return state;
return { ...state, previousSessionId: action.sessionId };
}
return state;
}
export function useCopilotPage() {
const router = useRouter();
const queryClient = useQueryClient();
const { user, isLoggedIn, isUserLoading } = useSupabase();
const { toast } = useToast();
const { completeStep } = useOnboarding();
const { urlSessionId, setUrlSessionId } = useCopilotSessionId();
const isNewChatModalOpen = useCopilotStore((s) => s.isNewChatModalOpen);
const setIsStreaming = useCopilotStore((s) => s.setIsStreaming);
const isCreating = useCopilotStore((s) => s.isCreatingSession);
const setIsCreating = useCopilotStore((s) => s.setIsCreatingSession);
// Complete VISIT_COPILOT onboarding step to grant $5 welcome bonus
useEffect(() => {
if (isLoggedIn) {
completeStep("VISIT_COPILOT");
}
}, [completeStep, isLoggedIn]);
const cancelNewChat = useCopilotStore((s) => s.cancelNewChat);
const isChatEnabled = useGetFlag(Flag.CHAT);
const flags = useFlags<FlagValues>();
@@ -49,27 +88,72 @@ export function useCopilotPage() {
const isFlagReady =
!isLaunchDarklyConfigured || flags[Flag.CHAT] !== undefined;
const [state, dispatch] = useReducer(copilotReducer, {
pageState: { type: "welcome" },
initialPrompts: {},
previousSessionId: null,
});
const greetingName = getGreetingName(user);
const quickActions = getQuickActions();
const hasSession = Boolean(urlSessionId);
const initialPrompt = urlSessionId
? getInitialPrompt(urlSessionId)
: undefined;
function setPageState(pageState: PageState) {
dispatch({ type: "setPageState", pageState });
}
useEffect(() => {
if (!isFlagReady) return;
if (isChatEnabled === false) {
router.replace(homepageRoute);
}
}, [homepageRoute, isChatEnabled, isFlagReady, router]);
function setInitialPrompt(sessionId: string, prompt: string) {
dispatch({ type: "setInitialPrompt", sessionId, prompt });
}
function setPreviousSessionId(sessionId: string | null) {
dispatch({ type: "setPreviousSessionId", sessionId });
}
const { setUrlSessionId } = useCopilotURLState({
pageState: state.pageState,
initialPrompts: state.initialPrompts,
previousSessionId: state.previousSessionId,
setPageState,
setInitialPrompt,
setPreviousSessionId,
});
useEffect(
function transitionNewChatToWelcome() {
if (state.pageState.type === "newChat") {
function setWelcomeState() {
dispatch({ type: "setPageState", pageState: { type: "welcome" } });
}
const timer = setTimeout(setWelcomeState, 300);
return function cleanup() {
clearTimeout(timer);
};
}
},
[state.pageState.type],
);
useEffect(
function ensureAccess() {
if (!isFlagReady) return;
if (isChatEnabled === false) {
router.replace(homepageRoute);
}
},
[homepageRoute, isChatEnabled, isFlagReady, router],
);
async function startChatWithPrompt(prompt: string) {
if (!prompt?.trim()) return;
if (isCreating) return;
if (state.pageState.type === "creating") return;
const trimmedPrompt = prompt.trim();
setIsCreating(true);
dispatch({
type: "setPageState",
pageState: { type: "creating", prompt: trimmedPrompt },
});
try {
const sessionResponse = await postV2CreateSession({
@@ -81,19 +165,27 @@ export function useCopilotPage() {
}
const sessionId = sessionResponse.data.id;
setInitialPrompt(sessionId, trimmedPrompt);
dispatch({
type: "setInitialPrompt",
sessionId,
prompt: trimmedPrompt,
});
await queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
await setUrlSessionId(sessionId, { shallow: true });
await setUrlSessionId(sessionId, { shallow: false });
dispatch({
type: "setPageState",
pageState: { type: "chat", sessionId, initialPrompt: trimmedPrompt },
});
} catch (error) {
console.error("[CopilotPage] Failed to start chat:", error);
toast({ title: "Failed to start chat", variant: "destructive" });
Sentry.captureException(error);
} finally {
setIsCreating(false);
dispatch({ type: "setPageState", pageState: { type: "welcome" } });
}
}
@@ -109,13 +201,21 @@ export function useCopilotPage() {
setIsStreaming(isStreamingValue);
}
function handleCancelNewChat() {
cancelNewChat();
}
function handleNewChatModalOpen(isOpen: boolean) {
if (!isOpen) cancelNewChat();
}
return {
state: {
greetingName,
quickActions,
isLoading: isUserLoading,
hasSession,
initialPrompt,
pageState: state.pageState,
isNewChatModalOpen,
isReady: isFlagReady && isChatEnabled !== false && isLoggedIn,
},
handlers: {
@@ -123,32 +223,8 @@ export function useCopilotPage() {
startChatWithPrompt,
handleSessionNotFound,
handleStreamingChange,
handleCancelNewChat,
handleNewChatModalOpen,
},
};
}
function getInitialPrompt(sessionId: string): string | undefined {
try {
const prompts = JSON.parse(
sessionStorage.get(SessionKey.CHAT_INITIAL_PROMPTS) || "{}",
);
return prompts[sessionId];
} catch {
return undefined;
}
}
function setInitialPrompt(sessionId: string, prompt: string): void {
try {
const prompts = JSON.parse(
sessionStorage.get(SessionKey.CHAT_INITIAL_PROMPTS) || "{}",
);
prompts[sessionId] = prompt;
sessionStorage.set(
SessionKey.CHAT_INITIAL_PROMPTS,
JSON.stringify(prompts),
);
} catch {
// Ignore storage errors
}
}

View File

@@ -1,10 +0,0 @@
import { parseAsString, useQueryState } from "nuqs";
export function useCopilotSessionId() {
const [urlSessionId, setUrlSessionId] = useQueryState(
"sessionId",
parseAsString,
);
return { urlSessionId, setUrlSessionId };
}

View File

@@ -0,0 +1,80 @@
import { parseAsString, useQueryState } from "nuqs";
import { useLayoutEffect } from "react";
import {
getInitialPromptFromState,
type PageState,
shouldResetToWelcome,
} from "./helpers";
interface UseCopilotUrlStateArgs {
pageState: PageState;
initialPrompts: Record<string, string>;
previousSessionId: string | null;
setPageState: (pageState: PageState) => void;
setInitialPrompt: (sessionId: string, prompt: string) => void;
setPreviousSessionId: (sessionId: string | null) => void;
}
export function useCopilotURLState({
pageState,
initialPrompts,
previousSessionId,
setPageState,
setInitialPrompt,
setPreviousSessionId,
}: UseCopilotUrlStateArgs) {
const [urlSessionId, setUrlSessionId] = useQueryState(
"sessionId",
parseAsString,
);
function syncSessionFromUrl() {
if (urlSessionId) {
if (pageState.type === "chat" && pageState.sessionId === urlSessionId) {
setPreviousSessionId(urlSessionId);
return;
}
const storedInitialPrompt = initialPrompts[urlSessionId];
const currentInitialPrompt = getInitialPromptFromState(
pageState,
storedInitialPrompt,
);
if (currentInitialPrompt) {
setInitialPrompt(urlSessionId, currentInitialPrompt);
}
setPageState({
type: "chat",
sessionId: urlSessionId,
initialPrompt: currentInitialPrompt,
});
setPreviousSessionId(urlSessionId);
return;
}
const wasInChat = previousSessionId !== null && pageState.type === "chat";
setPreviousSessionId(null);
if (wasInChat) {
setPageState({ type: "newChat" });
return;
}
if (shouldResetToWelcome(pageState)) {
setPageState({ type: "welcome" });
}
}
useLayoutEffect(syncSessionFromUrl, [
urlSessionId,
pageState.type,
previousSessionId,
initialPrompts,
]);
return {
urlSessionId,
setUrlSessionId,
};
}

View File

@@ -4594,7 +4594,6 @@
"AGENT_NEW_RUN",
"AGENT_INPUT",
"CONGRATS",
"VISIT_COPILOT",
"MARKETPLACE_VISIT",
"BUILDER_OPEN"
],
@@ -8755,7 +8754,6 @@
"AGENT_NEW_RUN",
"AGENT_INPUT",
"CONGRATS",
"VISIT_COPILOT",
"GET_RESULTS",
"MARKETPLACE_VISIT",
"MARKETPLACE_ADD_AGENT",

View File

@@ -1,77 +0,0 @@
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest, NextResponse } from "next/server";
const WHISPER_API_URL = "https://api.openai.com/v1/audio/transcriptions";
const MAX_FILE_SIZE = 25 * 1024 * 1024; // 25MB - Whisper's limit
function getExtensionFromMimeType(mimeType: string): string {
const subtype = mimeType.split("/")[1]?.split(";")[0];
return subtype || "webm";
}
export async function POST(request: NextRequest) {
const token = await getServerAuthToken();
if (!token) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
return NextResponse.json(
{ error: "OpenAI API key not configured" },
{ status: 401 },
);
}
try {
const formData = await request.formData();
const audioFile = formData.get("audio");
if (!audioFile || !(audioFile instanceof Blob)) {
return NextResponse.json(
{ error: "No audio file provided" },
{ status: 400 },
);
}
if (audioFile.size > MAX_FILE_SIZE) {
return NextResponse.json(
{ error: "File too large. Maximum size is 25MB." },
{ status: 413 },
);
}
const ext = getExtensionFromMimeType(audioFile.type);
const whisperFormData = new FormData();
whisperFormData.append("file", audioFile, `recording.${ext}`);
whisperFormData.append("model", "whisper-1");
const response = await fetch(WHISPER_API_URL, {
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
},
body: whisperFormData,
});
if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
console.error("Whisper API error:", errorData);
return NextResponse.json(
{ error: errorData.error?.message || "Transcription failed" },
{ status: response.status },
);
}
const result = await response.json();
return NextResponse.json({ text: result.text });
} catch (error) {
console.error("Transcription error:", error);
return NextResponse.json(
{ error: "Failed to process audio" },
{ status: 500 },
);
}
}

View File

@@ -1,17 +1,16 @@
"use client";
import { useCopilotSessionId } from "@/app/(platform)/copilot/useCopilotSessionId";
import { useCopilotStore } from "@/app/(platform)/copilot/copilot-page-store";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import { useEffect, useRef } from "react";
import { ChatContainer } from "./components/ChatContainer/ChatContainer";
import { ChatErrorState } from "./components/ChatErrorState/ChatErrorState";
import { ChatLoader } from "./components/ChatLoader/ChatLoader";
import { useChat } from "./useChat";
export interface ChatProps {
className?: string;
urlSessionId?: string | null;
initialPrompt?: string;
onSessionNotFound?: () => void;
onStreamingChange?: (isStreaming: boolean) => void;
@@ -19,13 +18,12 @@ export interface ChatProps {
export function Chat({
className,
urlSessionId,
initialPrompt,
onSessionNotFound,
onStreamingChange,
}: ChatProps) {
const { urlSessionId } = useCopilotSessionId();
const hasHandledNotFoundRef = useRef(false);
const isSwitchingSession = useCopilotStore((s) => s.isSwitchingSession);
const {
messages,
isLoading,
@@ -35,59 +33,49 @@ export function Chat({
sessionId,
createSession,
showLoader,
startPollingForOperation,
} = useChat({ urlSessionId });
useEffect(() => {
if (!onSessionNotFound) return;
if (!urlSessionId) return;
if (!isSessionNotFound || isLoading || isCreating) return;
if (hasHandledNotFoundRef.current) return;
hasHandledNotFoundRef.current = true;
onSessionNotFound();
}, [
onSessionNotFound,
urlSessionId,
isSessionNotFound,
isLoading,
isCreating,
]);
const shouldShowLoader =
(showLoader && (isLoading || isCreating)) || isSwitchingSession;
useEffect(
function handleMissingSession() {
if (!onSessionNotFound) return;
if (!urlSessionId) return;
if (!isSessionNotFound || isLoading || isCreating) return;
if (hasHandledNotFoundRef.current) return;
hasHandledNotFoundRef.current = true;
onSessionNotFound();
},
[onSessionNotFound, urlSessionId, isSessionNotFound, isLoading, isCreating],
);
return (
<div className={cn("flex h-full flex-col", className)}>
{/* Main Content */}
<main className="flex min-h-0 w-full flex-1 flex-col overflow-hidden bg-[#f8f8f9]">
{/* Loading State */}
{shouldShowLoader && (
{showLoader && (isLoading || isCreating) && (
<div className="flex flex-1 items-center justify-center">
<div className="flex flex-col items-center gap-3">
<LoadingSpinner size="large" className="text-neutral-400" />
<div className="flex flex-col items-center gap-4">
<ChatLoader />
<Text variant="body" className="text-zinc-500">
{isSwitchingSession
? "Switching chat..."
: "Loading your chat..."}
Loading your chats...
</Text>
</div>
</div>
)}
{/* Error State */}
{error && !isLoading && !isSwitchingSession && (
{error && !isLoading && (
<ChatErrorState error={error} onRetry={createSession} />
)}
{/* Session Content */}
{sessionId && !isLoading && !error && !isSwitchingSession && (
{sessionId && !isLoading && !error && (
<ChatContainer
sessionId={sessionId}
initialMessages={messages}
initialPrompt={initialPrompt}
className="flex-1"
onStreamingChange={onStreamingChange}
onOperationStarted={startPollingForOperation}
/>
)}
</main>

View File

@@ -58,17 +58,39 @@ function notifyStreamComplete(
}
}
function cleanupExpiredStreams(
completedStreams: Map<string, StreamResult>,
): Map<string, StreamResult> {
function cleanupCompletedStreams(completedStreams: Map<string, StreamResult>) {
const now = Date.now();
const cleaned = new Map(completedStreams);
for (const [sessionId, result] of cleaned) {
for (const [sessionId, result] of completedStreams) {
if (now - result.completedAt > COMPLETED_STREAM_TTL) {
cleaned.delete(sessionId);
completedStreams.delete(sessionId);
}
}
return cleaned;
}
function moveToCompleted(
activeStreams: Map<string, ActiveStream>,
completedStreams: Map<string, StreamResult>,
streamCompleteCallbacks: Set<StreamCompleteCallback>,
sessionId: string,
) {
const stream = activeStreams.get(sessionId);
if (!stream) return;
const result: StreamResult = {
sessionId,
status: stream.status,
chunks: stream.chunks,
completedAt: Date.now(),
error: stream.error,
};
completedStreams.set(sessionId, result);
activeStreams.delete(sessionId);
cleanupCompletedStreams(completedStreams);
if (stream.status === "completed" || stream.status === "error") {
notifyStreamComplete(streamCompleteCallbacks, sessionId);
}
}
export const useChatStore = create<ChatStore>((set, get) => ({
@@ -84,31 +106,17 @@ export const useChatStore = create<ChatStore>((set, get) => ({
context,
onChunk,
) {
const state = get();
const newActiveStreams = new Map(state.activeStreams);
let newCompletedStreams = new Map(state.completedStreams);
const callbacks = state.streamCompleteCallbacks;
const { activeStreams, completedStreams, streamCompleteCallbacks } = get();
const existingStream = newActiveStreams.get(sessionId);
const existingStream = activeStreams.get(sessionId);
if (existingStream) {
existingStream.abortController.abort();
const normalizedStatus =
existingStream.status === "streaming"
? "completed"
: existingStream.status;
const result: StreamResult = {
moveToCompleted(
activeStreams,
completedStreams,
streamCompleteCallbacks,
sessionId,
status: normalizedStatus,
chunks: existingStream.chunks,
completedAt: Date.now(),
error: existingStream.error,
};
newCompletedStreams.set(sessionId, result);
newActiveStreams.delete(sessionId);
newCompletedStreams = cleanupExpiredStreams(newCompletedStreams);
if (normalizedStatus === "completed" || normalizedStatus === "error") {
notifyStreamComplete(callbacks, sessionId);
}
);
}
const abortController = new AbortController();
@@ -124,76 +132,36 @@ export const useChatStore = create<ChatStore>((set, get) => ({
onChunkCallbacks: initialCallbacks,
};
newActiveStreams.set(sessionId, stream);
set({
activeStreams: newActiveStreams,
completedStreams: newCompletedStreams,
});
activeStreams.set(sessionId, stream);
try {
await executeStream(stream, message, isUserMessage, context);
} finally {
if (onChunk) stream.onChunkCallbacks.delete(onChunk);
if (stream.status !== "streaming") {
const currentState = get();
const finalActiveStreams = new Map(currentState.activeStreams);
let finalCompletedStreams = new Map(currentState.completedStreams);
const storedStream = finalActiveStreams.get(sessionId);
if (storedStream === stream) {
const result: StreamResult = {
sessionId,
status: stream.status,
chunks: stream.chunks,
completedAt: Date.now(),
error: stream.error,
};
finalCompletedStreams.set(sessionId, result);
finalActiveStreams.delete(sessionId);
finalCompletedStreams = cleanupExpiredStreams(finalCompletedStreams);
set({
activeStreams: finalActiveStreams,
completedStreams: finalCompletedStreams,
});
if (stream.status === "completed" || stream.status === "error") {
notifyStreamComplete(
currentState.streamCompleteCallbacks,
sessionId,
);
}
}
moveToCompleted(
activeStreams,
completedStreams,
streamCompleteCallbacks,
sessionId,
);
}
}
},
stopStream: function stopStream(sessionId) {
const state = get();
const stream = state.activeStreams.get(sessionId);
if (!stream) return;
stream.abortController.abort();
stream.status = "completed";
const newActiveStreams = new Map(state.activeStreams);
let newCompletedStreams = new Map(state.completedStreams);
const result: StreamResult = {
sessionId,
status: stream.status,
chunks: stream.chunks,
completedAt: Date.now(),
error: stream.error,
};
newCompletedStreams.set(sessionId, result);
newActiveStreams.delete(sessionId);
newCompletedStreams = cleanupExpiredStreams(newCompletedStreams);
set({
activeStreams: newActiveStreams,
completedStreams: newCompletedStreams,
});
notifyStreamComplete(state.streamCompleteCallbacks, sessionId);
const { activeStreams, completedStreams, streamCompleteCallbacks } = get();
const stream = activeStreams.get(sessionId);
if (stream) {
stream.abortController.abort();
stream.status = "completed";
moveToCompleted(
activeStreams,
completedStreams,
streamCompleteCallbacks,
sessionId,
);
}
},
subscribeToStream: function subscribeToStream(
@@ -201,18 +169,16 @@ export const useChatStore = create<ChatStore>((set, get) => ({
onChunk,
skipReplay = false,
) {
const state = get();
const stream = state.activeStreams.get(sessionId);
const { activeStreams } = get();
const stream = activeStreams.get(sessionId);
if (stream) {
if (!skipReplay) {
for (const chunk of stream.chunks) {
onChunk(chunk);
}
}
stream.onChunkCallbacks.add(onChunk);
return function unsubscribe() {
stream.onChunkCallbacks.delete(onChunk);
};
@@ -238,12 +204,7 @@ export const useChatStore = create<ChatStore>((set, get) => ({
},
clearCompletedStream: function clearCompletedStream(sessionId) {
const state = get();
if (!state.completedStreams.has(sessionId)) return;
const newCompletedStreams = new Map(state.completedStreams);
newCompletedStreams.delete(sessionId);
set({ completedStreams: newCompletedStreams });
get().completedStreams.delete(sessionId);
},
isStreaming: function isStreaming(sessionId) {
@@ -252,21 +213,11 @@ export const useChatStore = create<ChatStore>((set, get) => ({
},
registerActiveSession: function registerActiveSession(sessionId) {
const state = get();
if (state.activeSessions.has(sessionId)) return;
const newActiveSessions = new Set(state.activeSessions);
newActiveSessions.add(sessionId);
set({ activeSessions: newActiveSessions });
get().activeSessions.add(sessionId);
},
unregisterActiveSession: function unregisterActiveSession(sessionId) {
const state = get();
if (!state.activeSessions.has(sessionId)) return;
const newActiveSessions = new Set(state.activeSessions);
newActiveSessions.delete(sessionId);
set({ activeSessions: newActiveSessions });
get().activeSessions.delete(sessionId);
},
isSessionActive: function isSessionActive(sessionId) {
@@ -274,16 +225,10 @@ export const useChatStore = create<ChatStore>((set, get) => ({
},
onStreamComplete: function onStreamComplete(callback) {
const state = get();
const newCallbacks = new Set(state.streamCompleteCallbacks);
newCallbacks.add(callback);
set({ streamCompleteCallbacks: newCallbacks });
const { streamCompleteCallbacks } = get();
streamCompleteCallbacks.add(callback);
return function unsubscribe() {
const currentState = get();
const cleanedCallbacks = new Set(currentState.streamCompleteCallbacks);
cleanedCallbacks.delete(callback);
set({ streamCompleteCallbacks: cleanedCallbacks });
streamCompleteCallbacks.delete(callback);
};
},
}));

View File

@@ -16,7 +16,6 @@ export interface ChatContainerProps {
initialPrompt?: string;
className?: string;
onStreamingChange?: (isStreaming: boolean) => void;
onOperationStarted?: () => void;
}
export function ChatContainer({
@@ -25,7 +24,6 @@ export function ChatContainer({
initialPrompt,
className,
onStreamingChange,
onOperationStarted,
}: ChatContainerProps) {
const {
messages,
@@ -40,7 +38,6 @@ export function ChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
});
useEffect(() => {

View File

@@ -22,7 +22,6 @@ export interface HandlerDependencies {
setIsStreamingInitiated: Dispatch<SetStateAction<boolean>>;
setIsRegionBlockedModalOpen: Dispatch<SetStateAction<boolean>>;
sessionId: string;
onOperationStarted?: () => void;
}
export function isRegionBlockedError(chunk: StreamChunk): boolean {
@@ -49,15 +48,6 @@ export function handleTextEnded(
const completedText = deps.streamingChunksRef.current.join("");
if (completedText.trim()) {
deps.setMessages((prev) => {
// Check if this exact message already exists to prevent duplicates
const exists = prev.some(
(msg) =>
msg.type === "message" &&
msg.role === "assistant" &&
msg.content === completedText,
);
if (exists) return prev;
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
@@ -164,11 +154,6 @@ export function handleToolResponse(
}
return;
}
// Trigger polling when operation_started is received
if (responseMessage.type === "operation_started") {
deps.onOperationStarted?.();
}
deps.setMessages((prev) => {
const toolCallIndex = prev.findIndex(
(msg) => msg.type === "tool_call" && msg.toolId === chunk.tool_id,
@@ -218,24 +203,13 @@ export function handleStreamEnd(
]);
}
if (completedContent.trim()) {
deps.setMessages((prev) => {
// Check if this exact message already exists to prevent duplicates
const exists = prev.some(
(msg) =>
msg.type === "message" &&
msg.role === "assistant" &&
msg.content === completedContent,
);
if (exists) return prev;
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
content: completedContent,
timestamp: new Date(),
};
return [...prev, assistantMessage];
});
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
content: completedContent,
timestamp: new Date(),
};
deps.setMessages((prev) => [...prev, assistantMessage]);
}
deps.setStreamingChunks([]);
deps.streamingChunksRef.current = [];

View File

@@ -304,7 +304,6 @@ export function parseToolResponse(
if (isAgentArray(agentsData)) {
return {
type: "agent_carousel",
toolId,
toolName: "agent_carousel",
agents: agentsData,
totalCount: parsedResult.total_count as number | undefined,
@@ -317,7 +316,6 @@ export function parseToolResponse(
if (responseType === "execution_started") {
return {
type: "execution_started",
toolId,
toolName: "execution_started",
executionId: (parsedResult.execution_id as string) || "",
agentName: (parsedResult.graph_name as string) || undefined,
@@ -343,41 +341,6 @@ export function parseToolResponse(
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_started") {
return {
type: "operation_started",
toolName: (parsedResult.tool_name as string) || toolName,
toolId,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation started. You can close this tab.",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_pending") {
return {
type: "operation_pending",
toolName: (parsedResult.tool_name as string) || toolName,
toolId,
operationId: (parsedResult.operation_id as string) || "",
message:
(parsedResult.message as string) ||
"Operation in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "operation_in_progress") {
return {
type: "operation_in_progress",
toolName: (parsedResult.tool_name as string) || toolName,
toolCallId: (parsedResult.tool_call_id as string) || toolId,
message:
(parsedResult.message as string) ||
"Operation already in progress. Please wait...",
timestamp: timestamp || new Date(),
};
}
if (responseType === "need_login") {
return {
type: "login_needed",

View File

@@ -14,40 +14,16 @@ import {
processInitialMessages,
} from "./helpers";
// Helper to generate deduplication key for a message
function getMessageKey(msg: ChatMessageData): string {
if (msg.type === "message") {
// Don't include timestamp - dedupe by role + content only
// This handles the case where local and server timestamps differ
// Server messages are authoritative, so duplicates from local state are filtered
return `msg:${msg.role}:${msg.content}`;
} else if (msg.type === "tool_call") {
return `toolcall:${msg.toolId}`;
} else if (msg.type === "tool_response") {
return `toolresponse:${(msg as any).toolId}`;
} else if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
return `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`;
} else {
return `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`;
}
}
interface Args {
sessionId: string | null;
initialMessages: SessionDetailResponse["messages"];
initialPrompt?: string;
onOperationStarted?: () => void;
}
export function useChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
}: Args) {
const [messages, setMessages] = useState<ChatMessageData[]>([]);
const [streamingChunks, setStreamingChunks] = useState<string[]>([]);
@@ -97,102 +73,20 @@ export function useChatContainer({
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
setIsStreamingInitiated(true);
const skipReplay = initialMessages.length > 0;
return subscribeToStream(sessionId, dispatcher, skipReplay);
},
[
sessionId,
stopStreaming,
activeStreams,
subscribeToStream,
onOperationStarted,
],
[sessionId, stopStreaming, activeStreams, subscribeToStream],
);
// Collect toolIds from completed tool results in initialMessages
// Used to filter out operation messages when their results arrive
const completedToolIds = useMemo(() => {
const processedInitial = processInitialMessages(initialMessages);
const ids = new Set<string>();
for (const msg of processedInitial) {
if (
msg.type === "tool_response" ||
msg.type === "agent_carousel" ||
msg.type === "execution_started"
) {
const toolId = (msg as any).toolId;
if (toolId) {
ids.add(toolId);
}
}
}
return ids;
}, [initialMessages]);
// Clean up local operation messages when their completed results arrive from polling
// This effect runs when completedToolIds changes (i.e., when polling brings new results)
useEffect(
function cleanupCompletedOperations() {
if (completedToolIds.size === 0) return;
setMessages((prev) => {
const filtered = prev.filter((msg) => {
if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
const toolId = (msg as any).toolId || (msg as any).toolCallId;
if (toolId && completedToolIds.has(toolId)) {
return false; // Remove - operation completed
}
}
return true;
});
// Only update state if something was actually filtered
return filtered.length === prev.length ? prev : filtered;
});
},
[completedToolIds],
const allMessages = useMemo(
() => [...processInitialMessages(initialMessages), ...messages],
[initialMessages, messages],
);
// Combine initial messages from backend with local streaming messages,
// Server messages maintain correct order; only append truly new local messages
const allMessages = useMemo(() => {
const processedInitial = processInitialMessages(initialMessages);
// Build a set of keys from server messages for deduplication
const serverKeys = new Set<string>();
for (const msg of processedInitial) {
serverKeys.add(getMessageKey(msg));
}
// Filter local messages: remove duplicates and completed operation messages
const newLocalMessages = messages.filter((msg) => {
// Remove operation messages for completed tools
if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
const toolId = (msg as any).toolId || (msg as any).toolCallId;
if (toolId && completedToolIds.has(toolId)) {
return false;
}
}
// Remove messages that already exist in server data
const key = getMessageKey(msg);
return !serverKeys.has(key);
});
// Server messages first (correct order), then new local messages
return [...processedInitial, ...newLocalMessages];
}, [initialMessages, messages, completedToolIds]);
async function sendMessage(
content: string,
isUserMessage: boolean = true,
@@ -224,7 +118,6 @@ export function useChatContainer({
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
try {

View File

@@ -1,14 +1,7 @@
import { Button } from "@/components/atoms/Button/Button";
import { cn } from "@/lib/utils";
import {
ArrowUpIcon,
CircleNotchIcon,
MicrophoneIcon,
StopIcon,
} from "@phosphor-icons/react";
import { RecordingIndicator } from "./components/RecordingIndicator";
import { ArrowUpIcon, StopIcon } from "@phosphor-icons/react";
import { useChatInput } from "./useChatInput";
import { useVoiceRecording } from "./useVoiceRecording";
export interface Props {
onSend: (message: string) => void;
@@ -28,35 +21,13 @@ export function ChatInput({
className,
}: Props) {
const inputId = "chat-input";
const {
value,
setValue,
handleKeyDown: baseHandleKeyDown,
handleSubmit,
handleChange,
hasMultipleLines,
} = useChatInput({
onSend,
disabled: disabled || isStreaming,
maxRows: 4,
inputId,
});
const {
isRecording,
isTranscribing,
elapsedTime,
toggleRecording,
handleKeyDown,
showMicButton,
isInputDisabled,
} = useVoiceRecording({
setValue,
disabled: disabled || isStreaming,
isStreaming,
value,
baseHandleKeyDown,
});
const { value, handleKeyDown, handleSubmit, handleChange, hasMultipleLines } =
useChatInput({
onSend,
disabled: disabled || isStreaming,
maxRows: 4,
inputId,
});
return (
<form onSubmit={handleSubmit} className={cn("relative flex-1", className)}>
@@ -64,11 +35,8 @@ export function ChatInput({
<div
id={`${inputId}-wrapper`}
className={cn(
"relative overflow-hidden border bg-white shadow-sm",
"focus-within:ring-1",
isRecording
? "border-red-400 focus-within:border-red-400 focus-within:ring-red-400"
: "border-neutral-200 focus-within:border-zinc-400 focus-within:ring-zinc-400",
"relative overflow-hidden border border-neutral-200 bg-white shadow-sm",
"focus-within:border-zinc-400 focus-within:ring-1 focus-within:ring-zinc-400",
hasMultipleLines ? "rounded-xlarge" : "rounded-full",
)}
>
@@ -78,91 +46,48 @@ export function ChatInput({
value={value}
onChange={handleChange}
onKeyDown={handleKeyDown}
placeholder={
isTranscribing
? "Transcribing..."
: isRecording
? ""
: placeholder
}
disabled={isInputDisabled}
placeholder={placeholder}
disabled={disabled || isStreaming}
rows={1}
className={cn(
"w-full resize-none overflow-y-auto border-0 bg-transparent text-[1rem] leading-6 text-black",
"placeholder:text-zinc-400",
"focus:outline-none focus:ring-0",
"disabled:text-zinc-500",
hasMultipleLines
? "pb-6 pl-4 pr-4 pt-2"
: showMicButton
? "pb-4 pl-14 pr-14 pt-4"
: "pb-4 pl-4 pr-14 pt-4",
hasMultipleLines ? "pb-6 pl-4 pr-4 pt-2" : "pb-4 pl-4 pr-14 pt-4",
)}
/>
{isRecording && !value && (
<div className="pointer-events-none absolute inset-0 flex items-center justify-center">
<RecordingIndicator elapsedTime={elapsedTime} />
</div>
)}
</div>
<span id="chat-input-hint" className="sr-only">
Press Enter to send, Shift+Enter for new line, Space to record voice
Press Enter to send, Shift+Enter for new line
</span>
{showMicButton && (
<div className="absolute bottom-[7px] left-2 flex items-center gap-1">
<Button
type="button"
variant="icon"
size="icon"
aria-label={isRecording ? "Stop recording" : "Start recording"}
onClick={toggleRecording}
disabled={disabled || isTranscribing}
className={cn(
isRecording
? "animate-pulse border-red-500 bg-red-500 text-white hover:border-red-600 hover:bg-red-600"
: isTranscribing
? "border-zinc-300 bg-zinc-100 text-zinc-400"
: "border-zinc-300 bg-white text-zinc-500 hover:border-zinc-400 hover:bg-zinc-50 hover:text-zinc-700",
)}
>
{isTranscribing ? (
<CircleNotchIcon className="h-4 w-4 animate-spin" />
) : (
<MicrophoneIcon className="h-4 w-4" weight="bold" />
)}
</Button>
</div>
{isStreaming ? (
<Button
type="button"
variant="icon"
size="icon"
aria-label="Stop generating"
onClick={onStop}
className="absolute bottom-[7px] right-2 border-red-600 bg-red-600 text-white hover:border-red-800 hover:bg-red-800"
>
<StopIcon className="h-4 w-4" weight="bold" />
</Button>
) : (
<Button
type="submit"
variant="icon"
size="icon"
aria-label="Send message"
className={cn(
"absolute bottom-[7px] right-2 border-zinc-800 bg-zinc-800 text-white hover:border-zinc-900 hover:bg-zinc-900",
(disabled || !value.trim()) && "opacity-20",
)}
disabled={disabled || !value.trim()}
>
<ArrowUpIcon className="h-4 w-4" weight="bold" />
</Button>
)}
<div className="absolute bottom-[7px] right-2 flex items-center gap-1">
{isStreaming ? (
<Button
type="button"
variant="icon"
size="icon"
aria-label="Stop generating"
onClick={onStop}
className="border-red-600 bg-red-600 text-white hover:border-red-800 hover:bg-red-800"
>
<StopIcon className="h-4 w-4" weight="bold" />
</Button>
) : (
<Button
type="submit"
variant="icon"
size="icon"
aria-label="Send message"
className={cn(
"border-zinc-800 bg-zinc-800 text-white hover:border-zinc-900 hover:bg-zinc-900",
(disabled || !value.trim() || isRecording) && "opacity-20",
)}
disabled={disabled || !value.trim() || isRecording}
>
<ArrowUpIcon className="h-4 w-4" weight="bold" />
</Button>
)}
</div>
</div>
</form>
);

View File

@@ -1,41 +0,0 @@
import { formatElapsedTime } from "../helpers";
type Props = {
elapsedTime: number;
};
export function RecordingIndicator({ elapsedTime }: Props) {
return (
<div className="flex items-center gap-3">
<div className="flex items-center gap-[3px]">
{[0, 1, 2, 3, 4].map((i) => (
<div
key={i}
className="w-[3px] rounded-full bg-red-500"
style={{
animation: `waveform 1s ease-in-out infinite`,
animationDelay: `${i * 0.1}s`,
height: "16px",
}}
/>
))}
</div>
<span className="min-w-[3ch] text-sm font-medium text-red-500">
{formatElapsedTime(elapsedTime)}
</span>
<style jsx>{`
@keyframes waveform {
0%,
100% {
transform: scaleY(0.3);
opacity: 0.5;
}
50% {
transform: scaleY(1);
opacity: 1;
}
}
`}</style>
</div>
);
}

View File

@@ -1,6 +0,0 @@
export function formatElapsedTime(ms: number): string {
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
const remainingSeconds = seconds % 60;
return `${minutes}:${remainingSeconds.toString().padStart(2, "0")}`;
}

View File

@@ -6,7 +6,7 @@ import {
useState,
} from "react";
interface Args {
interface UseChatInputArgs {
onSend: (message: string) => void;
disabled?: boolean;
maxRows?: number;
@@ -18,7 +18,7 @@ export function useChatInput({
disabled = false,
maxRows = 5,
inputId = "chat-input",
}: Args) {
}: UseChatInputArgs) {
const [value, setValue] = useState("");
const [hasMultipleLines, setHasMultipleLines] = useState(false);

View File

@@ -1,239 +0,0 @@
import { useToast } from "@/components/molecules/Toast/use-toast";
import React, {
KeyboardEvent,
useCallback,
useEffect,
useRef,
useState,
} from "react";
const MAX_RECORDING_DURATION = 2 * 60 * 1000; // 2 minutes in ms
interface Args {
setValue: React.Dispatch<React.SetStateAction<string>>;
disabled?: boolean;
isStreaming?: boolean;
value: string;
baseHandleKeyDown: (event: KeyboardEvent<HTMLTextAreaElement>) => void;
}
export function useVoiceRecording({
setValue,
disabled = false,
isStreaming = false,
value,
baseHandleKeyDown,
}: Args) {
const [isRecording, setIsRecording] = useState(false);
const [isTranscribing, setIsTranscribing] = useState(false);
const [error, setError] = useState<string | null>(null);
const [elapsedTime, setElapsedTime] = useState(0);
const mediaRecorderRef = useRef<MediaRecorder | null>(null);
const chunksRef = useRef<Blob[]>([]);
const timerRef = useRef<NodeJS.Timeout | null>(null);
const startTimeRef = useRef<number>(0);
const streamRef = useRef<MediaStream | null>(null);
const isRecordingRef = useRef(false);
const isSupported =
typeof window !== "undefined" &&
!!(navigator.mediaDevices && navigator.mediaDevices.getUserMedia);
const clearTimer = useCallback(() => {
if (timerRef.current) {
clearInterval(timerRef.current);
timerRef.current = null;
}
}, []);
const cleanup = useCallback(() => {
clearTimer();
if (streamRef.current) {
streamRef.current.getTracks().forEach((track) => track.stop());
streamRef.current = null;
}
mediaRecorderRef.current = null;
chunksRef.current = [];
setElapsedTime(0);
}, [clearTimer]);
const handleTranscription = useCallback(
(text: string) => {
setValue((prev) => {
const trimmedPrev = prev.trim();
if (trimmedPrev) {
return `${trimmedPrev} ${text}`;
}
return text;
});
},
[setValue],
);
const transcribeAudio = useCallback(
async (audioBlob: Blob) => {
setIsTranscribing(true);
setError(null);
try {
const formData = new FormData();
formData.append("audio", audioBlob);
const response = await fetch("/api/transcribe", {
method: "POST",
body: formData,
});
if (!response.ok) {
const data = await response.json().catch(() => ({}));
throw new Error(data.error || "Transcription failed");
}
const data = await response.json();
if (data.text) {
handleTranscription(data.text);
}
} catch (err) {
const message =
err instanceof Error ? err.message : "Transcription failed";
setError(message);
console.error("Transcription error:", err);
} finally {
setIsTranscribing(false);
}
},
[handleTranscription],
);
const stopRecording = useCallback(() => {
if (mediaRecorderRef.current && isRecordingRef.current) {
mediaRecorderRef.current.stop();
isRecordingRef.current = false;
setIsRecording(false);
clearTimer();
}
}, [clearTimer]);
const startRecording = useCallback(async () => {
if (disabled || isRecordingRef.current || isTranscribing) return;
setError(null);
chunksRef.current = [];
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: true });
streamRef.current = stream;
const mediaRecorder = new MediaRecorder(stream, {
mimeType: MediaRecorder.isTypeSupported("audio/webm")
? "audio/webm"
: "audio/mp4",
});
mediaRecorderRef.current = mediaRecorder;
mediaRecorder.ondataavailable = (event) => {
if (event.data.size > 0) {
chunksRef.current.push(event.data);
}
};
mediaRecorder.onstop = async () => {
const audioBlob = new Blob(chunksRef.current, {
type: mediaRecorder.mimeType,
});
// Cleanup stream
if (streamRef.current) {
streamRef.current.getTracks().forEach((track) => track.stop());
streamRef.current = null;
}
if (audioBlob.size > 0) {
await transcribeAudio(audioBlob);
}
};
mediaRecorder.start(1000); // Collect data every second
isRecordingRef.current = true;
setIsRecording(true);
startTimeRef.current = Date.now();
// Start elapsed time timer
timerRef.current = setInterval(() => {
const elapsed = Date.now() - startTimeRef.current;
setElapsedTime(elapsed);
// Auto-stop at max duration
if (elapsed >= MAX_RECORDING_DURATION) {
stopRecording();
}
}, 100);
} catch (err) {
console.error("Failed to start recording:", err);
if (err instanceof DOMException && err.name === "NotAllowedError") {
setError("Microphone permission denied");
} else {
setError("Failed to access microphone");
}
cleanup();
}
}, [disabled, isTranscribing, stopRecording, transcribeAudio, cleanup]);
const toggleRecording = useCallback(() => {
if (isRecording) {
stopRecording();
} else {
startRecording();
}
}, [isRecording, startRecording, stopRecording]);
const { toast } = useToast();
useEffect(() => {
if (error) {
toast({
title: "Voice recording failed",
description: error,
variant: "destructive",
});
}
}, [error, toast]);
const handleKeyDown = useCallback(
(event: KeyboardEvent<HTMLTextAreaElement>) => {
if (event.key === " " && !value.trim() && !isTranscribing) {
event.preventDefault();
toggleRecording();
return;
}
baseHandleKeyDown(event);
},
[value, isTranscribing, toggleRecording, baseHandleKeyDown],
);
const showMicButton = isSupported && !isStreaming;
const isInputDisabled = disabled || isStreaming || isTranscribing;
// Cleanup on unmount
useEffect(() => {
return () => {
cleanup();
};
}, [cleanup]);
return {
isRecording,
isTranscribing,
error,
elapsedTime,
startRecording,
stopRecording,
toggleRecording,
isSupported,
handleKeyDown,
showMicButton,
isInputDisabled,
};
}

View File

@@ -16,7 +16,6 @@ import { AuthPromptWidget } from "../AuthPromptWidget/AuthPromptWidget";
import { ChatCredentialsSetup } from "../ChatCredentialsSetup/ChatCredentialsSetup";
import { ClarificationQuestionsWidget } from "../ClarificationQuestionsWidget/ClarificationQuestionsWidget";
import { ExecutionStartedMessage } from "../ExecutionStartedMessage/ExecutionStartedMessage";
import { PendingOperationWidget } from "../PendingOperationWidget/PendingOperationWidget";
import { MarkdownContent } from "../MarkdownContent/MarkdownContent";
import { NoResultsMessage } from "../NoResultsMessage/NoResultsMessage";
import { ToolCallMessage } from "../ToolCallMessage/ToolCallMessage";
@@ -72,9 +71,6 @@ export function ChatMessage({
isLoginNeeded,
isCredentialsNeeded,
isClarificationNeeded,
isOperationStarted,
isOperationPending,
isOperationInProgress,
} = useChatMessage(message);
const displayContent = getDisplayContent(message, isUser);
@@ -130,6 +126,10 @@ export function ChatMessage({
[displayContent, message],
);
function isLongResponse(content: string): boolean {
return content.split("\n").length > 5;
}
const handleTryAgain = useCallback(() => {
if (message.type !== "message" || !onSendMessage) return;
onSendMessage(message.content, message.role === "user");
@@ -294,42 +294,6 @@ export function ChatMessage({
);
}
// Render operation_started messages (long-running background operations)
if (isOperationStarted && message.type === "operation_started") {
return (
<PendingOperationWidget
status="started"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_pending messages (operations in progress when refreshing)
if (isOperationPending && message.type === "operation_pending") {
return (
<PendingOperationWidget
status="pending"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render operation_in_progress messages (duplicate request while operation running)
if (isOperationInProgress && message.type === "operation_in_progress") {
return (
<PendingOperationWidget
status="in_progress"
message={message.message}
toolName={message.toolName}
className={className}
/>
);
}
// Render tool response messages (but skip agent_output if it's being rendered inside assistant message)
if (isToolResponse && message.type === "tool_response") {
return (
@@ -394,7 +358,7 @@ export function ChatMessage({
<ArrowsClockwiseIcon className="size-4 text-zinc-600" />
</Button>
)}
{!isUser && isFinalMessage && !isStreaming && (
{!isUser && isFinalMessage && isLongResponse(displayContent) && (
<Button
variant="ghost"
size="icon"

View File

@@ -61,7 +61,6 @@ export type ChatMessageData =
}
| {
type: "agent_carousel";
toolId: string;
toolName: string;
agents: Array<{
id: string;
@@ -75,7 +74,6 @@ export type ChatMessageData =
}
| {
type: "execution_started";
toolId: string;
toolName: string;
executionId: string;
agentName?: string;
@@ -105,29 +103,6 @@ export type ChatMessageData =
message: string;
sessionId: string;
timestamp?: string | Date;
}
| {
type: "operation_started";
toolName: string;
toolId: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_pending";
toolName: string;
toolId: string;
operationId: string;
message: string;
timestamp?: string | Date;
}
| {
type: "operation_in_progress";
toolName: string;
toolCallId: string;
message: string;
timestamp?: string | Date;
};
export function useChatMessage(message: ChatMessageData) {
@@ -149,8 +124,5 @@ export function useChatMessage(message: ChatMessageData) {
isExecutionStarted: message.type === "execution_started",
isInputsNeeded: message.type === "inputs_needed",
isClarificationNeeded: message.type === "clarification_needed",
isOperationStarted: message.type === "operation_started",
isOperationPending: message.type === "operation_pending",
isOperationInProgress: message.type === "operation_in_progress",
};
}

View File

@@ -30,7 +30,6 @@ export function ClarificationQuestionsWidget({
className,
}: Props) {
const [answers, setAnswers] = useState<Record<string, string>>({});
const [isSubmitted, setIsSubmitted] = useState(false);
function handleAnswerChange(keyword: string, value: string) {
setAnswers((prev) => ({ ...prev, [keyword]: value }));
@@ -42,42 +41,11 @@ export function ClarificationQuestionsWidget({
if (!allAnswered) {
return;
}
setIsSubmitted(true);
onSubmitAnswers(answers);
}
const allAnswered = questions.every((q) => answers[q.keyword]?.trim());
// Show submitted state after answers are submitted
if (isSubmitted) {
return (
<div
className={cn(
"group relative flex w-full justify-start gap-3 px-4 py-3",
className,
)}
>
<div className="flex w-full max-w-3xl gap-3">
<div className="flex-shrink-0">
<div className="flex h-7 w-7 items-center justify-center rounded-lg bg-green-500">
<CheckCircleIcon className="h-4 w-4 text-white" weight="bold" />
</div>
</div>
<div className="flex min-w-0 flex-1 flex-col">
<Card className="p-4">
<Text variant="h4" className="mb-1 text-slate-900">
Answers submitted
</Text>
<Text variant="small" className="text-slate-600">
Processing your responses...
</Text>
</Card>
</div>
</div>
</div>
);
}
return (
<div
className={cn(

View File

@@ -1,109 +0,0 @@
"use client";
import { Card } from "@/components/atoms/Card/Card";
import { Text } from "@/components/atoms/Text/Text";
import { cn } from "@/lib/utils";
import { CircleNotch, CheckCircle, XCircle } from "@phosphor-icons/react";
type OperationStatus =
| "pending"
| "started"
| "in_progress"
| "completed"
| "error";
interface Props {
status: OperationStatus;
message: string;
toolName?: string;
className?: string;
}
function getOperationTitle(toolName?: string): string {
if (!toolName) return "Operation";
// Convert tool name to human-readable format
// e.g., "create_agent" -> "Creating Agent", "edit_agent" -> "Editing Agent"
if (toolName === "create_agent") return "Creating Agent";
if (toolName === "edit_agent") return "Editing Agent";
// Default: capitalize and format tool name
return toolName
.split("_")
.map((word) => word.charAt(0).toUpperCase() + word.slice(1))
.join(" ");
}
export function PendingOperationWidget({
status,
message,
toolName,
className,
}: Props) {
const isPending =
status === "pending" || status === "started" || status === "in_progress";
const isCompleted = status === "completed";
const isError = status === "error";
const operationTitle = getOperationTitle(toolName);
return (
<div
className={cn(
"group relative flex w-full justify-start gap-3 px-4 py-3",
className,
)}
>
<div className="flex w-full max-w-3xl gap-3">
<div className="flex-shrink-0">
<div
className={cn(
"flex h-7 w-7 items-center justify-center rounded-lg",
isPending && "bg-blue-500",
isCompleted && "bg-green-500",
isError && "bg-red-500",
)}
>
{isPending && (
<CircleNotch
className="h-4 w-4 animate-spin text-white"
weight="bold"
/>
)}
{isCompleted && (
<CheckCircle className="h-4 w-4 text-white" weight="bold" />
)}
{isError && (
<XCircle className="h-4 w-4 text-white" weight="bold" />
)}
</div>
</div>
<div className="flex min-w-0 flex-1 flex-col">
<Card className="space-y-2 p-4">
<div>
<Text variant="h4" className="mb-1 text-slate-900">
{isPending && operationTitle}
{isCompleted && `${operationTitle} Complete`}
{isError && `${operationTitle} Failed`}
</Text>
<Text variant="small" className="text-slate-600">
{message}
</Text>
</div>
{isPending && (
<Text variant="small" className="italic text-slate-500">
Check your library in a few minutes.
</Text>
)}
{toolName && (
<Text variant="small" className="text-slate-400">
Tool: {toolName}
</Text>
)}
</Card>
</div>
</div>
</div>
);
}

View File

@@ -10,7 +10,7 @@ export function UserChatBubble({ children, className }: UserChatBubbleProps) {
return (
<div
className={cn(
"group relative min-w-20 overflow-hidden rounded-xl bg-purple-100 px-3 text-left text-[1rem] leading-relaxed transition-all duration-500 ease-in-out",
"group relative min-w-20 overflow-hidden rounded-xl bg-purple-100 px-3 text-right text-[1rem] leading-relaxed transition-all duration-500 ease-in-out",
className,
)}
style={{

View File

@@ -26,7 +26,6 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
claimSession,
clearSession: clearSessionBase,
loadSession,
startPollingForOperation,
} = useChatSession({
urlSessionId,
autoCreate: false,
@@ -95,6 +94,5 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
loadSession,
sessionId: sessionIdFromHook,
showLoader,
startPollingForOperation,
};
}

View File

@@ -59,7 +59,6 @@ export function useChatSession({
query: {
enabled: !!sessionId,
select: okData,
staleTime: 0,
retry: shouldRetrySessionLoad,
retryDelay: getSessionRetryDelay,
},
@@ -103,123 +102,15 @@ export function useChatSession({
}
}, [createError, loadError]);
// Track if we should be polling (set by external callers when they receive operation_started via SSE)
const [forcePolling, setForcePolling] = useState(false);
// Track if we've seen server acknowledge the pending operation (to avoid clearing forcePolling prematurely)
const hasSeenServerPendingRef = useRef(false);
// Check if there are any pending operations in the messages
// Must check all operation types: operation_pending, operation_started, operation_in_progress
const hasPendingOperationsFromServer = useMemo(() => {
if (!messages || messages.length === 0) return false;
const pendingTypes = new Set([
"operation_pending",
"operation_in_progress",
"operation_started",
]);
return messages.some((msg) => {
if (msg.role !== "tool" || !msg.content) return false;
try {
const content =
typeof msg.content === "string"
? JSON.parse(msg.content)
: msg.content;
return pendingTypes.has(content?.type);
} catch {
return false;
}
});
}, [messages]);
// Track when server has acknowledged the pending operation
useEffect(() => {
if (hasPendingOperationsFromServer) {
hasSeenServerPendingRef.current = true;
}
}, [hasPendingOperationsFromServer]);
// Combined: poll if server has pending ops OR if we received operation_started via SSE
const hasPendingOperations = hasPendingOperationsFromServer || forcePolling;
// Clear forcePolling only after server has acknowledged AND completed the operation
useEffect(() => {
if (
forcePolling &&
!hasPendingOperationsFromServer &&
hasSeenServerPendingRef.current
) {
// Server acknowledged the operation and it's now complete
setForcePolling(false);
hasSeenServerPendingRef.current = false;
}
}, [forcePolling, hasPendingOperationsFromServer]);
// Function to trigger polling (called when operation_started is received via SSE)
function startPollingForOperation() {
setForcePolling(true);
hasSeenServerPendingRef.current = false; // Reset for new operation
}
// Refresh sessions list when a pending operation completes
// (hasPendingOperations transitions from true to false)
const prevHasPendingOperationsRef = useRef(hasPendingOperations);
useEffect(
function refreshSessionsListOnOperationComplete() {
const wasHasPending = prevHasPendingOperationsRef.current;
prevHasPendingOperationsRef.current = hasPendingOperations;
// Only invalidate when transitioning from pending to not pending
if (wasHasPending && !hasPendingOperations && sessionId) {
function refreshSessionsListOnLoad() {
if (sessionId && sessionData && !isLoadingSession) {
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
}
},
[hasPendingOperations, sessionId, queryClient],
);
// Poll for updates when there are pending operations
// Backoff: 2s, 4s, 6s, 8s, 10s, ... up to 30s max
const pollAttemptRef = useRef(0);
const hasPendingOperationsRef = useRef(hasPendingOperations);
hasPendingOperationsRef.current = hasPendingOperations;
useEffect(
function pollForPendingOperations() {
if (!sessionId || !hasPendingOperations) {
pollAttemptRef.current = 0;
return;
}
let cancelled = false;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
function schedule() {
// 2s, 4s, 6s, 8s, 10s, ... 30s (max)
const delay = Math.min((pollAttemptRef.current + 1) * 2000, 30000);
timeoutId = setTimeout(async () => {
if (cancelled) return;
pollAttemptRef.current += 1;
try {
await refetch();
} catch (err) {
console.error("[useChatSession] Poll failed:", err);
} finally {
if (!cancelled && hasPendingOperationsRef.current) {
schedule();
}
}
}, delay);
}
schedule();
return () => {
cancelled = true;
if (timeoutId) clearTimeout(timeoutId);
};
},
[sessionId, hasPendingOperations, refetch],
[sessionId, sessionData, isLoadingSession, queryClient],
);
async function createSession() {
@@ -348,13 +239,11 @@ export function useChatSession({
isCreating,
error,
isSessionNotFound: isNotFoundError(loadError),
hasPendingOperations,
createSession,
loadSession,
refreshSession,
claimSession,
clearSession,
startPollingForOperation,
};
}

View File

@@ -255,18 +255,13 @@ export function Wallet() {
(notification: WebSocketNotification) => {
if (
notification.type !== "onboarding" ||
notification.event !== "step_completed"
notification.event !== "step_completed" ||
!walletRef.current
) {
return;
}
// Always refresh credits when any onboarding step completes
fetchCredits();
// Only trigger confetti for tasks that are in displayed groups
if (!walletRef.current) {
return;
}
// Only trigger confetti for tasks that are in groups
const taskIds = groups
.flatMap((group) => group.tasks)
.map((task) => task.id);
@@ -279,6 +274,7 @@ export function Wallet() {
return;
}
fetchCredits();
party.confetti(walletRef.current, {
count: 30,
spread: 120,
@@ -288,7 +284,7 @@ export function Wallet() {
modules: [fadeOut],
});
},
[fetchCredits, fadeOut, groups],
[fetchCredits, fadeOut],
);
// WebSocket setup for onboarding notifications

View File

@@ -1003,7 +1003,6 @@ export type OnboardingStep =
| "AGENT_INPUT"
| "CONGRATS"
// First Wins
| "VISIT_COPILOT"
| "GET_RESULTS"
| "MARKETPLACE_VISIT"
| "MARKETPLACE_ADD_AGENT"

View File

@@ -3,7 +3,6 @@ import { environment } from "../environment";
export enum SessionKey {
CHAT_SENT_INITIAL_PROMPTS = "chat_sent_initial_prompts",
CHAT_INITIAL_PROMPTS = "chat_initial_prompts",
}
function get(key: SessionKey) {

View File

@@ -37,13 +37,9 @@ export class LoginPage {
this.page.on("load", (page) => console.log(` Now at URL: ${page.url()}`));
// Start waiting for navigation before clicking
// Wait for redirect to marketplace, onboarding, library, or copilot (new landing pages)
const leaveLoginPage = this.page
.waitForURL(
(url: URL) =>
/^\/(marketplace|onboarding(\/.*)?|library|copilot)?$/.test(
url.pathname,
),
(url) => /^\/(marketplace|onboarding(\/.*)?)?$/.test(url.pathname),
{ timeout: 10_000 },
)
.catch((reason) => {

View File

@@ -36,16 +36,14 @@ export async function signupTestUser(
const signupButton = getButton("Sign up");
await signupButton.click();
// Wait for successful signup - could redirect to various pages depending on onboarding state
// Wait for successful signup - could redirect to onboarding or marketplace
try {
// Wait for redirect to onboarding, marketplace, copilot, or library
// Use a single waitForURL with a callback to avoid Promise.race race conditions
await page.waitForURL(
(url: URL) =>
/\/(onboarding|marketplace|copilot|library)/.test(url.pathname),
{ timeout: 15000 },
);
// Wait for either onboarding or marketplace redirect
await Promise.race([
page.waitForURL(/\/onboarding/, { timeout: 15000 }),
page.waitForURL(/\/marketplace/, { timeout: 15000 }),
]);
} catch (error) {
console.error(
"❌ Timeout waiting for redirect, current URL:",
@@ -56,19 +54,14 @@ export async function signupTestUser(
const currentUrl = page.url();
// Handle onboarding redirect if needed
// Handle onboarding or marketplace redirect
if (currentUrl.includes("/onboarding") && ignoreOnboarding) {
await page.goto("http://localhost:3000/marketplace");
await page.waitForLoadState("domcontentloaded", { timeout: 10000 });
}
// Verify we're on an expected final page and user is authenticated
if (currentUrl.includes("/copilot") || currentUrl.includes("/library")) {
// For copilot/library landing pages, just verify user is authenticated
await page
.getByTestId("profile-popout-menu-trigger")
.waitFor({ state: "visible", timeout: 10000 });
} else if (ignoreOnboarding || currentUrl.includes("/marketplace")) {
// Verify we're on the expected final page
if (ignoreOnboarding || currentUrl.includes("/marketplace")) {
// Verify we're on marketplace
await page
.getByText(

View File

@@ -0,0 +1 @@
# Video editing blocks