Compare commits

..

12 Commits

Author SHA1 Message Date
Zamil Majdy
f47cd573af fix(copilot): workspace tools, text rendering, conversation context
- MorphingTextAnimation: add gap-0 to fix character spacing in tool status
- write_workspace_file: accept plain text `content` and `source_path`
  (copy from ephemeral dir) in addition to base64; fallback to plain text
  when base64 decode fails
- read_workspace_file: add `save_to_path` to copy workspace files to
  ephemeral dir for processing
- SDK system prompt: restructured with clear ephemeral vs persistent
  storage sections and file transfer instructions
- _format_conversation_context: include tool call summaries and truncated
  tool results so agent retains full context when transcript is unavailable
  or stale (root cause of "amnesia" between turns)
- Added diagnostic logging for transcript availability
2026-02-19 22:20:19 +07:00
Zamil Majdy
670f812f0f fix(copilot): skip gap detection when transcript metadata is missing
When transcript metadata is absent (old transcripts or failed write),
transcript_msg_count defaults to 0.  The condition >= 0 was always true,
causing the entire session history to be resent as a "gap" — duplicating
what the --resume transcript already contains.  Change to > 0 so we
only compute a gap when we actually know the upload watermark.
2026-02-19 16:03:15 +08:00
Zamil Majdy
ecfe4e6a7a fix(copilot): RPC DataError reconstruction, chat stream reconnection
Fix two issues:

1. RPC DataError deserialization crash: When the database-manager
   returns a 400 for a Prisma DataError/UniqueViolationError, the
   client-side reconstruction crashes because DataError.__init__
   expects a dict but exc.args only contains a string message.
   Wrap the string in the expected dict structure so the exception
   is properly caught by callers (e.g. workspace file overwrites).

2. Chat stream reconnection on page refresh: The AI SDK's built-in
   resume:true fires before message hydration completes, causing
   hydrated messages to overwrite the resumed stream. Replace with
   manual resumeStream() called after hydration + active_stream
   detection. Show the stop button immediately when an active stream
   is detected (isReconnecting flag) and prevent sending new messages
   until reconnected.
2026-02-19 15:50:51 +08:00
Otto (AGPT)
efb4b3b518 fix: Update _pending_tool_outputs type to dict[str, list[str]] 2026-02-19 02:42:05 +00:00
Otto (AGPT)
ebeab7fbe6 fix(copilot): Address GenericTool review comments
- Fix parseOutput treating arrays as objects (skip Array.isArray)
- Add React import for React.ReactNode type reference
- Differentiate web_fetch vs WebSearch title in accordion
2026-02-19 02:15:52 +00:00
Otto (AGPT)
98ef8a26ab fix(copilot): Address new review comments
- Guard metadata store() with try/except so failure doesn't orphan the
  already-uploaded transcript (coderabbit Major)
- Fix OrbitLoader size from 20 to 14 to match static icons
- Filter output.files to confirmed strings instead of unchecked cast
2026-02-19 01:57:47 +00:00
Otto (AGPT)
ed02e6db9e style: format GenericTool.tsx with prettier 2026-02-19 01:56:36 +00:00
Otto (AGPT)
6952334b85 fix(copilot): Address remaining review comments
- Tool output stashing: use FIFO queue per tool name instead of single
  value, so duplicate calls to the same tool in one turn each get their
  own output (fixes sentry HIGH/MEDIUM)
- Web accordion: show JSON fallback when output has no recognized text
  fields (fixes empty accordion body edge case)
- Cleanup dir logging: log when project dir not found
- Flush behavior and TodoItem cast are already correct as-is
2026-02-19 00:37:13 +00:00
Otto (AGPT)
0c586c2edf fix(copilot): Address PR review comments
- Shield transcript upload and session save from generator cancellation
  (asyncio.shield) so page refresh/disconnect doesn't lose the transcript
- Return content_base64 for small image files (not just text) so
  _extract_image_block can actually work
- Add 32KB size limit to _extract_image_block to prevent oversized images
- Fix gap fill when transcript_msg_count == 0 (metadata absent)
- Add truncation to files.join in GenericTool.tsx
2026-02-19 00:30:06 +00:00
Zamil Majdy
b6128dd75f feat(copilot): stream resume, transcript staleness detection, WebSearch display
- Enable `resume: true` on `useChat` with `prepareReconnectToStreamRequest`
  so page refresh reconnects to active backend streams via Redis replay
- Add `message_count` watermark + timestamp metadata to transcript uploads;
  on download, detect staleness and compress only the gap instead of the
  full history (hybrid: transcript via --resume + compressed missed turns)
- Fix WebSearch accordion showing empty by extracting text from MCP-style
  content blocks (`extractMcpText`) with raw JSON fallback
- Revert over-blocking: only `AskUserQuestion` added to SDK_DISALLOWED_TOOLS
  (removed EnterPlanMode, ExitPlanMode, Skill, NotebookEdit)
- Add defensive TodoItem filter per coderabbit review
- Fix service_test for TranscriptDownload return type change
2026-02-19 05:09:41 +05:30
Zamil Majdy
c4f5f7c8b8 Merge branch 'dev' into copilot/sdk-improvements 2026-02-19 00:14:23 +05:30
Zamil Majdy
8af4e0bf7d feat(copilot): SDK tool output, transcript resume, image support, GenericTool UI
- Fix SDK built-in tool outputs (WebSearch, Read, TodoWrite) not showing
  in frontend by stashing outputs via PostToolUse hook and flushing
  unresolved tool calls in response adapter
- Fix transcript-based --resume for multi-turn conversations: single
  clean upload block after async with, extracted _try_upload_transcript
  helper, removed redundant dual-strategy code
- Add image support in MCP tool results: detect workspace file responses
  with content_base64 and return MCP image content blocks so Claude can
  "see" small images (<32KB)
- Overhaul GenericTool.tsx with tool-specific icons, TodoWrite checklist
  rendering, WebSearch text display, and proper accordion content
- Downgrade 36 per-operation [TIMING]/[TASK_LOOKUP] diagnostic logs from
  info to debug in stream_registry.py and service.py
- Fix silent exceptions: add warning logs for swallowed ValueError/
  TypeError in stream_registry and Exception in service long-running path
- Clean up transcript.py: remove dead find_cli_transcript and
  read_fallback_transcript functions, simplify logging
2026-02-19 00:11:08 +05:30
37 changed files with 1171 additions and 1299 deletions

View File

@@ -18,7 +18,7 @@ from backend.copilot.completion_handler import (
process_operation_success,
)
from backend.copilot.config import ChatConfig
from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_task
from backend.copilot.executor.utils import enqueue_copilot_task
from backend.copilot.model import (
ChatMessage,
ChatSession,
@@ -50,7 +50,6 @@ from backend.copilot.tools.models import (
OperationPendingResponse,
OperationStartedResponse,
SetupRequirementsResponse,
SuggestedGoalResponse,
UnderstandingUpdatedResponse,
)
from backend.copilot.tracking import track_user_message
@@ -132,14 +131,6 @@ class ListSessionsResponse(BaseModel):
total: int
class CancelTaskResponse(BaseModel):
"""Response model for the cancel task endpoint."""
cancelled: bool
task_id: str | None = None
reason: str | None = None
class OperationCompleteRequest(BaseModel):
"""Request model for external completion webhook."""
@@ -322,57 +313,6 @@ async def get_session(
)
@router.post(
"/sessions/{session_id}/cancel",
status_code=200,
)
async def cancel_session_task(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_user_id)],
) -> CancelTaskResponse:
"""Cancel the active streaming task for a session.
Publishes a cancel event to the executor via RabbitMQ FANOUT, then
polls Redis until the task status flips from ``running`` or a timeout
(5 s) is reached. Returns only after the cancellation is confirmed.
"""
await _validate_and_get_session(session_id, user_id)
active_task, _ = await stream_registry.get_active_task_for_session(
session_id, user_id
)
if not active_task:
return CancelTaskResponse(cancelled=False, reason="no_active_task")
task_id = active_task.task_id
await enqueue_cancel_task(task_id)
logger.info(
f"[CANCEL] Published cancel for task ...{task_id[-8:]} "
f"session ...{session_id[-8:]}"
)
# Poll until the executor confirms the task is no longer running.
# Keep max_wait below typical reverse-proxy read timeouts.
poll_interval = 0.5
max_wait = 5.0
waited = 0.0
while waited < max_wait:
await asyncio.sleep(poll_interval)
waited += poll_interval
task = await stream_registry.get_task(task_id)
if task is None or task.status != "running":
logger.info(
f"[CANCEL] Task ...{task_id[-8:]} confirmed stopped "
f"(status={task.status if task else 'gone'}) after {waited:.1f}s"
)
return CancelTaskResponse(cancelled=True, task_id=task_id)
logger.warning(f"[CANCEL] Task ...{task_id[-8:]} not confirmed after {max_wait}s")
return CancelTaskResponse(
cancelled=True, task_id=task_id, reason="cancel_published_not_confirmed"
)
@router.post(
"/sessions/{session_id}/stream",
)
@@ -1044,7 +984,6 @@ ToolResponseUnion = (
| AgentPreviewResponse
| AgentSavedResponse
| ClarificationNeededResponse
| SuggestedGoalResponse
| BlockListResponse
| BlockDetailsResponse
| BlockOutputResponse

View File

@@ -205,20 +205,3 @@ async def enqueue_copilot_task(
message=entry.model_dump_json(),
exchange=COPILOT_EXECUTION_EXCHANGE,
)
async def enqueue_cancel_task(task_id: str) -> None:
"""Publish a cancel request for a running CoPilot task.
Sends a ``CancelCoPilotEvent`` to the FANOUT exchange so all executor
pods receive the cancellation signal.
"""
from backend.util.clients import get_async_copilot_queue
event = CancelCoPilotEvent(task_id=task_id)
queue_client = await get_async_copilot_queue()
await queue_client.publish_message(
routing_key="", # FANOUT ignores routing key
message=event.model_dump_json(),
exchange=COPILOT_CANCEL_EXCHANGE,
)

View File

@@ -1,272 +0,0 @@
"""Tests for parallel tool call execution in CoPilot.
These tests mock _yield_tool_call to avoid importing the full copilot stack
which requires Prisma, DB connections, etc.
"""
import asyncio
import time
from typing import Any, cast
import pytest
@pytest.mark.asyncio
async def test_parallel_tool_calls_run_concurrently():
"""Multiple tool calls should complete in ~max(delays), not sum(delays)."""
# Import here to allow module-level mocking if needed
from backend.copilot.response_model import (
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from backend.copilot.service import _execute_tool_calls_parallel
n_tools = 3
delay_per_tool = 0.2
tool_calls = [
{
"id": f"call_{i}",
"type": "function",
"function": {"name": f"tool_{i}", "arguments": "{}"},
}
for i in range(n_tools)
]
# Minimal session mock
class FakeSession:
session_id = "test"
user_id = "test"
def __init__(self):
self.messages = []
original_yield = None
async def fake_yield(tc_list, idx, sess, lock=None):
yield StreamToolInputAvailable(
toolCallId=tc_list[idx]["id"],
toolName=tc_list[idx]["function"]["name"],
input={},
)
await asyncio.sleep(delay_per_tool)
yield StreamToolOutputAvailable(
toolCallId=tc_list[idx]["id"],
toolName=tc_list[idx]["function"]["name"],
output="{}",
)
import backend.copilot.service as svc
original_yield = svc._yield_tool_call
svc._yield_tool_call = fake_yield
try:
start = time.monotonic()
events = []
async for event in _execute_tool_calls_parallel(
tool_calls, cast(Any, FakeSession())
):
events.append(event)
elapsed = time.monotonic() - start
finally:
svc._yield_tool_call = original_yield
assert len(events) == n_tools * 2
# Parallel: should take ~delay, not ~n*delay
assert elapsed < delay_per_tool * (
n_tools - 0.5
), f"Took {elapsed:.2f}s, expected parallel (~{delay_per_tool}s)"
@pytest.mark.asyncio
async def test_single_tool_call_works():
"""Single tool call should work identically."""
from backend.copilot.response_model import (
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from backend.copilot.service import _execute_tool_calls_parallel
tool_calls = [
{
"id": "call_0",
"type": "function",
"function": {"name": "t", "arguments": "{}"},
}
]
class FakeSession:
session_id = "test"
user_id = "test"
def __init__(self):
self.messages = []
async def fake_yield(tc_list, idx, sess, lock=None):
yield StreamToolInputAvailable(toolCallId="call_0", toolName="t", input={})
yield StreamToolOutputAvailable(toolCallId="call_0", toolName="t", output="{}")
import backend.copilot.service as svc
orig = svc._yield_tool_call
svc._yield_tool_call = fake_yield
try:
events = [
e
async for e in _execute_tool_calls_parallel(
tool_calls, cast(Any, FakeSession())
)
]
finally:
svc._yield_tool_call = orig
assert len(events) == 2
@pytest.mark.asyncio
async def test_retryable_error_propagates():
"""Retryable errors should be raised after all tools finish."""
from backend.copilot.response_model import StreamToolOutputAvailable
from backend.copilot.service import _execute_tool_calls_parallel
tool_calls = [
{
"id": f"call_{i}",
"type": "function",
"function": {"name": f"t_{i}", "arguments": "{}"},
}
for i in range(2)
]
class FakeSession:
session_id = "test"
user_id = "test"
def __init__(self):
self.messages = []
async def fake_yield(tc_list, idx, sess, lock=None):
if idx == 1:
raise KeyError("bad")
from backend.copilot.response_model import StreamToolInputAvailable
yield StreamToolInputAvailable(
toolCallId=tc_list[idx]["id"], toolName="t_0", input={}
)
await asyncio.sleep(0.05)
yield StreamToolOutputAvailable(
toolCallId=tc_list[idx]["id"], toolName="t_0", output="{}"
)
import backend.copilot.service as svc
orig = svc._yield_tool_call
svc._yield_tool_call = fake_yield
try:
events = []
with pytest.raises(KeyError):
async for event in _execute_tool_calls_parallel(
tool_calls, cast(Any, FakeSession())
):
events.append(event)
# First tool's events should still be yielded
assert any(isinstance(e, StreamToolOutputAvailable) for e in events)
finally:
svc._yield_tool_call = orig
@pytest.mark.asyncio
async def test_session_lock_shared():
"""All parallel tools should receive the same lock instance."""
from backend.copilot.response_model import (
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from backend.copilot.service import _execute_tool_calls_parallel
tool_calls = [
{
"id": f"call_{i}",
"type": "function",
"function": {"name": f"t_{i}", "arguments": "{}"},
}
for i in range(3)
]
class FakeSession:
session_id = "test"
user_id = "test"
def __init__(self):
self.messages = []
observed_locks = []
async def fake_yield(tc_list, idx, sess, lock=None):
observed_locks.append(lock)
yield StreamToolInputAvailable(
toolCallId=tc_list[idx]["id"], toolName=f"t_{idx}", input={}
)
yield StreamToolOutputAvailable(
toolCallId=tc_list[idx]["id"], toolName=f"t_{idx}", output="{}"
)
import backend.copilot.service as svc
orig = svc._yield_tool_call
svc._yield_tool_call = fake_yield
try:
async for _ in _execute_tool_calls_parallel(
tool_calls, cast(Any, FakeSession())
):
pass
finally:
svc._yield_tool_call = orig
assert len(observed_locks) == 3
assert observed_locks[0] is observed_locks[1] is observed_locks[2]
assert isinstance(observed_locks[0], asyncio.Lock)
@pytest.mark.asyncio
async def test_cancellation_cleans_up():
"""Generator close should cancel in-flight tasks."""
from backend.copilot.response_model import StreamToolInputAvailable
from backend.copilot.service import _execute_tool_calls_parallel
tool_calls = [
{
"id": f"call_{i}",
"type": "function",
"function": {"name": f"t_{i}", "arguments": "{}"},
}
for i in range(2)
]
class FakeSession:
session_id = "test"
user_id = "test"
def __init__(self):
self.messages = []
started = asyncio.Event()
async def fake_yield(tc_list, idx, sess, lock=None):
yield StreamToolInputAvailable(
toolCallId=tc_list[idx]["id"], toolName=f"t_{idx}", input={}
)
started.set()
await asyncio.sleep(10) # simulate long-running
import backend.copilot.service as svc
orig = svc._yield_tool_call
svc._yield_tool_call = fake_yield
try:
gen = _execute_tool_calls_parallel(tool_calls, cast(Any, FakeSession()))
await gen.__anext__() # get first event
await started.wait()
await gen.aclose() # close generator
finally:
svc._yield_tool_call = orig
# If we get here without hanging, cleanup worked

View File

@@ -83,19 +83,42 @@ _SDK_TOOL_SUPPLEMENT = """
## Tool notes
### Shell commands
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
for shell commands — it runs in a network-isolated sandbox.
- **Shared workspace**: The SDK Read/Write tools and `bash_exec` share the
same working directory. Files created by one are readable by the other.
- **IMPORTANT — File persistence**: Your working directory is **ephemeral** —
files are lost between turns. When you create or modify important files
(code, configs, outputs), you MUST save them using `write_workspace_file`
so they persist. Use `read_workspace_file` and `list_workspace_files` to
access files saved in previous turns. If a "Files from previous turns"
section is present above, those files are available via `read_workspace_file`.
- Long-running tools (create_agent, edit_agent, etc.) are handled
asynchronously. You will receive an immediate response; the actual result
is delivered to the user via a background stream.
### Two storage systems — CRITICAL to understand
1. **Ephemeral working directory** (`/tmp/copilot-<session>/`):
- Shared by SDK Read/Write/Edit/Glob/Grep tools AND `bash_exec`
- Files here are **lost between turns** — do NOT rely on them persisting
- Use for temporary work: running scripts, processing data, etc.
2. **Persistent workspace** (cloud storage):
- Files here **survive across turns and sessions**
- Use `write_workspace_file` to save important files (code, outputs, configs)
- Use `read_workspace_file` to retrieve previously saved files
- Use `list_workspace_files` to see what files you've saved before
- Call `list_workspace_files(include_all_sessions=True)` to see files from
all sessions
### Moving files between ephemeral and persistent storage
- **Ephemeral → Persistent**: Use `write_workspace_file` with either:
- `content` param (plain text) — for text files
- `source_path` param — to copy any file directly from the ephemeral dir
- **Persistent → Ephemeral**: Use `read_workspace_file` with `save_to_path`
param to download a workspace file to the ephemeral dir for processing
### File persistence workflow
When you create or modify important files (code, configs, outputs), you MUST:
1. Save them using `write_workspace_file` so they persist
2. At the start of a new turn, call `list_workspace_files` to see what files
are available from previous turns
### Long-running tools
Long-running tools (create_agent, edit_agent, etc.) are handled
asynchronously. You will receive an immediate response; the actual result
is delivered to the user via a background stream.
"""
@@ -374,11 +397,9 @@ async def _compress_conversation_history(
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
"""Format conversation messages into a context prefix for the user message.
Returns a string like:
<conversation_history>
User: hello
You responded: Hi! How can I help?
</conversation_history>
Includes user messages, assistant text, tool call summaries, and
tool result summaries so the agent retains full context about what
tools were invoked and their outcomes.
Returns None if there are no messages to format.
"""
@@ -387,13 +408,28 @@ def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
lines: list[str] = []
for msg in messages:
if not msg.content:
continue
if msg.role == "user":
lines.append(f"User: {msg.content}")
if msg.content:
lines.append(f"User: {msg.content}")
elif msg.role == "assistant":
lines.append(f"You responded: {msg.content}")
# Skip tool messages — they're internal details
if msg.content:
lines.append(f"You responded: {msg.content}")
# Include tool call summaries
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
tool_name = func.get("name", "unknown")
tool_args = func.get("arguments", "")
# Truncate long arguments
if len(tool_args) > 200:
tool_args = tool_args[:200] + "..."
lines.append(f"You called tool: {tool_name}({tool_args})")
elif msg.role == "tool":
# Include tool results (truncated to avoid context bloat)
content = msg.content or ""
if len(content) > 300:
content = content[:300] + "..."
lines.append(f"Tool result: {content}")
if not lines:
return None
@@ -519,6 +555,20 @@ async def stream_chat_completion_sdk(
if config.claude_agent_use_resume and user_id and len(session.messages) > 1:
dl = await download_transcript(user_id, session_id)
if dl and validate_transcript(dl.content):
logger.info(
f"[SDK] Transcript available for session {session_id}: "
f"{len(dl.content)}B, msg_count={dl.message_count}"
)
elif dl:
logger.warning(
f"[SDK] Transcript downloaded but invalid for {session_id}"
)
else:
logger.warning(
f"[SDK] No transcript available for {session_id} "
f"({len(session.messages)} messages in session)"
)
if dl and validate_transcript(dl.content):
resume_file = write_transcript_to_tempfile(
dl.content, session_id, sdk_cwd
@@ -611,7 +661,7 @@ async def stream_chat_completion_sdk(
f"Now, the user says:\n{current_message}"
)
logger.info(
logger.debug(
f"[SDK] Sending query ({len(session.messages)} msgs, "
f"resume={use_resume})"
)

View File

@@ -143,7 +143,7 @@ def read_transcript_file(transcript_path: str) -> str | None:
json.loads(lines[0])
json.loads(lines[-1])
logger.info(
logger.debug(
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
@@ -234,7 +234,7 @@ def write_transcript_to_tempfile(
with open(jsonl_path, "w") as f:
f.write(transcript_content)
logger.info(f"[Transcript] Wrote resume file: {jsonl_path}")
logger.debug(f"[Transcript] Wrote resume file: {jsonl_path}")
return jsonl_path
except OSError as e:
@@ -357,7 +357,7 @@ async def upload_transcript(
try:
existing = await storage.retrieve(path)
if len(existing) >= new_size:
logger.info(
logger.debug(
f"[Transcript] Skipping upload — existing ({len(existing)}B) "
f">= new ({new_size}B) for session {session_id}"
)
@@ -439,7 +439,7 @@ async def download_transcript(
except (FileNotFoundError, json.JSONDecodeError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
logger.info(
logger.debug(
f"[Transcript] Downloaded {len(content)}B "
f"(msg_count={message_count}) for session {session_id}"
)

View File

@@ -118,8 +118,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
- **When `create_agent` returns `suggested_goal`**: Present the suggestion to the user and ask "Would you like me to proceed with this refined goal?" If they accept, call `create_agent` again with the suggested goal.
- **When `create_agent` returns `clarifying_questions`**: After the user answers, call `create_agent` again with the original description AND the answers in the `context` parameter.
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
@@ -166,11 +164,6 @@ Adapt flexibly to the conversation context. Not every interaction requires all s
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
**Handle Feedback Loops:**
- When a tool returns a suggested alternative (like a refined goal), present it clearly and ask the user for confirmation before proceeding
- When clarifying questions are answered, immediately re-call the tool with the accumulated context
- Don't ask redundant questions if the user has already provided context in the conversation
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
@@ -394,7 +387,7 @@ async def stream_chat_completion(
if user_id:
log_meta["user_id"] = user_id
logger.info(
logger.debug(
f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, "
f"message_len={len(message) if message else 0}, is_user={is_user_message}",
extra={
@@ -411,7 +404,7 @@ async def stream_chat_completion(
fetch_start = time.monotonic()
session = await get_chat_session(session_id, user_id)
fetch_time = (time.monotonic() - fetch_start) * 1000
logger.info(
logger.debug(
f"[TIMING] get_chat_session took {fetch_time:.1f}ms, "
f"n_messages={len(session.messages) if session else 0}",
extra={
@@ -423,7 +416,7 @@ async def stream_chat_completion(
},
)
else:
logger.info(
logger.debug(
f"[TIMING] Using provided session, messages={len(session.messages)}",
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
)
@@ -457,7 +450,7 @@ async def stream_chat_completion(
message_length=len(message),
)
posthog_time = (time.monotonic() - posthog_start) * 1000
logger.info(
logger.debug(
f"[TIMING] track_user_message took {posthog_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": posthog_time}},
)
@@ -465,7 +458,7 @@ async def stream_chat_completion(
upsert_start = time.monotonic()
session = await upsert_chat_session(session)
upsert_time = (time.monotonic() - upsert_start) * 1000
logger.info(
logger.debug(
f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": upsert_time}},
)
@@ -510,7 +503,7 @@ async def stream_chat_completion(
prompt_start = time.monotonic()
system_prompt, understanding = await _build_system_prompt(user_id)
prompt_time = (time.monotonic() - prompt_start) * 1000
logger.info(
logger.debug(
f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": prompt_time}},
)
@@ -544,7 +537,7 @@ async def stream_chat_completion(
# Only yield message start for the initial call, not for continuations.
setup_time = (time.monotonic() - completion_start) * 1000
logger.info(
logger.debug(
f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms",
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
)
@@ -555,7 +548,7 @@ async def stream_chat_completion(
yield StreamStartStep()
try:
logger.info(
logger.debug(
"[TIMING] Calling _stream_chat_chunks",
extra={"json_fields": log_meta},
)
@@ -995,7 +988,7 @@ async def _stream_chat_chunks(
if session.user_id:
log_meta["user_id"] = session.user_id
logger.info(
logger.debug(
f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, "
f"user={session.user_id}, n_messages={len(session.messages)}",
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
@@ -1018,7 +1011,7 @@ async def _stream_chat_chunks(
base_url=config.base_url,
)
context_time = (time_module.perf_counter() - context_start) * 1000
logger.info(
logger.debug(
f"[TIMING] _manage_context_window took {context_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": context_time}},
)
@@ -1060,7 +1053,7 @@ async def _stream_chat_chunks(
retry_info = (
f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else ""
)
logger.info(
logger.debug(
f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}",
extra={
"json_fields": {
@@ -1100,7 +1093,7 @@ async def _stream_chat_chunks(
extra_body=extra_body,
)
api_init_time = (time_module.perf_counter() - api_call_start) * 1000
logger.info(
logger.debug(
f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": api_init_time}},
)
@@ -1149,7 +1142,7 @@ async def _stream_chat_chunks(
ttfc = (
time_module.perf_counter() - api_call_start
) * 1000
logger.info(
logger.debug(
f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms "
f"(since API call), n_chunks={chunk_count}",
extra={
@@ -1217,7 +1210,7 @@ async def _stream_chat_chunks(
)
emitted_start_for_idx.add(idx)
stream_duration = time_module.perf_counter() - api_call_start
logger.info(
logger.debug(
f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, "
f"duration={stream_duration:.2f}s, "
f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}",
@@ -1232,13 +1225,26 @@ async def _stream_chat_chunks(
},
)
# Execute all accumulated tool calls in parallel
# Events are yielded as they arrive from each concurrent tool
async for event in _execute_tool_calls_parallel(tool_calls, session):
yield event
# Yield all accumulated tool calls after the stream is complete
# This ensures all tool call arguments have been fully received
for idx, tool_call in enumerate(tool_calls):
try:
async for tc in _yield_tool_call(tool_calls, idx, session):
yield tc
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call {idx}: {e}",
exc_info=True,
extra={"tool_call": tool_call},
)
yield StreamError(
errorText=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
)
# Re-raise to trigger retry logic in the parent function
raise
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
logger.info(
logger.debug(
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
f"session={session.session_id}, user={session.user_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
@@ -1313,91 +1319,10 @@ async def _stream_chat_chunks(
return
async def _with_optional_lock(
lock: asyncio.Lock | None,
coro_fn: Any,
) -> Any:
"""Run *coro_fn()* under *lock* when provided, otherwise run directly."""
if lock:
async with lock:
return await coro_fn()
return await coro_fn()
async def _execute_tool_calls_parallel(
tool_calls: list[dict[str, Any]],
session: ChatSession,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Execute all tool calls concurrently, yielding stream events as they arrive.
Each tool runs as an ``asyncio.Task``, pushing events into a shared queue.
A ``session_lock`` serialises session-state mutations (long-running tool
bookkeeping, ``run_agent`` counters).
"""
queue: asyncio.Queue[StreamBaseResponse | None] = asyncio.Queue()
session_lock = asyncio.Lock()
n_tools = len(tool_calls)
retryable_errors: list[Exception] = []
async def _run_tool(idx: int) -> None:
tool_name = tool_calls[idx].get("function", {}).get("name", "unknown")
tool_call_id = tool_calls[idx].get("id", f"unknown_{idx}")
try:
async for event in _yield_tool_call(tool_calls, idx, session, session_lock):
await queue.put(event)
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call {idx} ({tool_name}): {e}",
exc_info=True,
)
retryable_errors.append(e)
except Exception as e:
# Infrastructure / setup errors — emit an error output so the
# client always sees a terminal event and doesn't hang.
logger.error(f"Tool call {idx} ({tool_name}) failed: {e}", exc_info=True)
await queue.put(
StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=ErrorResponse(
message=f"Tool execution failed: {e!s}",
error=type(e).__name__,
session_id=session.session_id,
).model_dump_json(),
success=False,
)
)
finally:
await queue.put(None) # sentinel
tasks = [asyncio.create_task(_run_tool(idx)) for idx in range(n_tools)]
try:
finished = 0
while finished < n_tools:
event = await queue.get()
if event is None:
finished += 1
else:
yield event
if retryable_errors:
if len(retryable_errors) > 1:
logger.warning(
f"{len(retryable_errors)} tool calls had retryable errors; "
f"re-raising first to trigger retry"
)
raise retryable_errors[0]
finally:
for t in tasks:
if not t.done():
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
async def _yield_tool_call(
tool_calls: list[dict[str, Any]],
yield_idx: int,
session: ChatSession,
session_lock: asyncio.Lock | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Yield a tool call and its execution result.
@@ -1495,7 +1420,8 @@ async def _yield_tool_call(
"check back in a few minutes."
)
# Track appended message for rollback on failure
# Track appended messages for rollback on failure
assistant_message: ChatMessage | None = None
pending_message: ChatMessage | None = None
# Wrap session save and task creation in try-except to release lock on failure
@@ -1510,24 +1436,22 @@ async def _yield_tool_call(
operation_id=operation_id,
)
# Attach tool_call and save pending result — lock serialises
# concurrent session mutations during parallel execution.
async def _save_pending() -> None:
nonlocal pending_message
session.add_tool_call_to_current_turn(tool_calls[yield_idx])
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
# Attach the tool_call to the current turn's assistant message
# (or create one if this is a tool-only response with no text).
session.add_tool_call_to_current_turn(tool_calls[yield_idx])
await _with_optional_lock(session_lock, _save_pending)
# Then save pending tool result
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} (task_id={task_id}) "
f"for tool {tool_name} in session {session.session_id}"
@@ -1551,13 +1475,19 @@ async def _yield_tool_call(
# Associate the asyncio task with the stream registry task
await stream_registry.set_task_asyncio_task(task_id, bg_task)
except Exception as e:
# Roll back appended messages — use identity-based removal so
# it works even when other parallel tools have appended after us.
async def _rollback() -> None:
if pending_message and pending_message in session.messages:
session.messages.remove(pending_message)
await _with_optional_lock(session_lock, _rollback)
# Roll back appended messages to prevent data corruption on subsequent saves
if (
pending_message
and session.messages
and session.messages[-1] == pending_message
):
session.messages.pop()
if (
assistant_message
and session.messages
and session.messages[-1] == assistant_message
):
session.messages.pop()
# Release the Redis lock since the background task won't be spawned
await _mark_operation_completed(tool_call_id)

View File

@@ -117,7 +117,7 @@ async def create_task(
if user_id:
log_meta["user_id"] = user_id
logger.info(
logger.debug(
f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}",
extra={"json_fields": log_meta},
)
@@ -135,7 +135,7 @@ async def create_task(
redis_start = time.perf_counter()
redis = await get_redis_async()
redis_time = (time.perf_counter() - redis_start) * 1000
logger.info(
logger.debug(
f"[TIMING] get_redis_async took {redis_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": redis_time}},
)
@@ -158,7 +158,7 @@ async def create_task(
},
)
hset_time = (time.perf_counter() - hset_start) * 1000
logger.info(
logger.debug(
f"[TIMING] redis.hset took {hset_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": hset_time}},
)
@@ -169,7 +169,7 @@ async def create_task(
await redis.set(op_key, task_id, ex=config.stream_ttl)
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
)
@@ -230,7 +230,7 @@ async def publish_chunk(
in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd")
or total_time > 50
):
logger.info(
logger.debug(
f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)",
extra={
"json_fields": {
@@ -279,7 +279,7 @@ async def subscribe_to_task(
if user_id:
log_meta["user_id"] = user_id
logger.info(
logger.debug(
f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}",
extra={"json_fields": {**log_meta, "last_message_id": last_message_id}},
)
@@ -289,14 +289,14 @@ async def subscribe_to_task(
meta_key = _get_task_meta_key(task_id)
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
hgetall_time = (time.perf_counter() - redis_start) * 1000
logger.info(
logger.debug(
f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}},
)
if not meta:
elapsed = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] Task not found in Redis after {elapsed:.1f}ms",
extra={
"json_fields": {
@@ -335,7 +335,7 @@ async def subscribe_to_task(
xread_start = time.perf_counter()
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
xread_time = (time.perf_counter() - xread_start) * 1000
logger.info(
logger.debug(
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}",
extra={
"json_fields": {
@@ -363,7 +363,7 @@ async def subscribe_to_task(
except Exception as e:
logger.warning(f"Failed to replay message: {e}")
logger.info(
logger.debug(
f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}",
extra={
"json_fields": {
@@ -376,7 +376,7 @@ async def subscribe_to_task(
# Step 2: If task is still running, start stream listener for live updates
if task_status == "running":
logger.info(
logger.debug(
"[TIMING] Task still running, starting _stream_listener",
extra={"json_fields": {**log_meta, "task_status": task_status}},
)
@@ -387,14 +387,14 @@ async def subscribe_to_task(
_listener_tasks[id(subscriber_queue)] = (task_id, listener_task)
else:
# Task is completed/failed - add finish marker
logger.info(
logger.debug(
f"[TIMING] Task already {task_status}, adding StreamFinish",
extra={"json_fields": {**log_meta, "task_status": task_status}},
)
await subscriber_queue.put(StreamFinish())
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, "
f"n_messages_replayed={replayed_count}",
extra={
@@ -433,7 +433,7 @@ async def _stream_listener(
if log_meta is None:
log_meta = {"component": "StreamRegistry", "task_id": task_id}
logger.info(
logger.debug(
f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}",
extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}},
)
@@ -462,7 +462,7 @@ async def _stream_listener(
if messages:
msg_count = sum(len(msgs) for _, msgs in messages)
logger.info(
logger.debug(
f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms",
extra={
"json_fields": {
@@ -475,7 +475,7 @@ async def _stream_listener(
)
elif xread_time > 1000:
# Only log timeouts (30s blocking)
logger.info(
logger.debug(
f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms",
extra={
"json_fields": {
@@ -526,7 +526,7 @@ async def _stream_listener(
if first_message_time is None:
first_message_time = time.perf_counter()
elapsed = (first_message_time - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}",
extra={
"json_fields": {
@@ -568,7 +568,7 @@ async def _stream_listener(
# Stop listening on finish
if isinstance(chunk, StreamFinish):
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}",
extra={
"json_fields": {
@@ -587,7 +587,7 @@ async def _stream_listener(
except asyncio.CancelledError:
elapsed = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}",
extra={
"json_fields": {
@@ -619,7 +619,7 @@ async def _stream_listener(
finally:
# Clean up listener task mapping on exit
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
logger.debug(
f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, "
f"delivered={messages_delivered}, xread_count={xread_count}",
extra={
@@ -835,7 +835,7 @@ async def get_active_task_for_session(
f"for task {task_id[:8]}...: {exc}"
)
logger.info(
logger.debug(
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
)

View File

@@ -22,7 +22,6 @@ from .models import (
ClarificationNeededResponse,
ClarifyingQuestion,
ErrorResponse,
SuggestedGoalResponse,
ToolResponseBase,
)
@@ -187,28 +186,26 @@ class CreateAgentTool(BaseTool):
if decomposition_result.get("type") == "unachievable_goal":
suggested = decomposition_result.get("suggested_goal", "")
reason = decomposition_result.get("reason", "")
return SuggestedGoalResponse(
return ErrorResponse(
message=(
f"This goal cannot be accomplished with the available blocks. {reason}"
f"This goal cannot be accomplished with the available blocks. "
f"{reason} "
f"Suggestion: {suggested}"
),
suggested_goal=suggested,
reason=reason,
original_goal=description,
goal_type="unachievable",
error="unachievable_goal",
details={"suggested_goal": suggested, "reason": reason},
session_id=session_id,
)
if decomposition_result.get("type") == "vague_goal":
suggested = decomposition_result.get("suggested_goal", "")
reason = decomposition_result.get(
"reason", "The goal needs more specific details"
)
return SuggestedGoalResponse(
message="The goal is too vague to create a specific workflow.",
suggested_goal=suggested,
reason=reason,
original_goal=description,
goal_type="vague",
return ErrorResponse(
message=(
f"The goal is too vague to create a specific workflow. "
f"Suggestion: {suggested}"
),
error="vague_goal",
details={"suggested_goal": suggested},
session_id=session_id,
)

View File

@@ -1,142 +0,0 @@
"""Tests for CreateAgentTool response types."""
from unittest.mock import AsyncMock, patch
import pytest
from backend.copilot.tools.create_agent import CreateAgentTool
from backend.copilot.tools.models import (
ClarificationNeededResponse,
ErrorResponse,
SuggestedGoalResponse,
)
from ._test_data import make_session
_TEST_USER_ID = "test-user-create-agent"
@pytest.fixture
def tool():
return CreateAgentTool()
@pytest.fixture
def session():
return make_session(_TEST_USER_ID)
@pytest.mark.asyncio
async def test_missing_description_returns_error(tool, session):
"""Missing description returns ErrorResponse."""
result = await tool._execute(user_id=_TEST_USER_ID, session=session, description="")
assert isinstance(result, ErrorResponse)
assert result.error == "Missing description parameter"
@pytest.mark.asyncio
async def test_vague_goal_returns_suggested_goal_response(tool, session):
"""vague_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
vague_result = {
"type": "vague_goal",
"suggested_goal": "Monitor Twitter mentions for a specific keyword and send a daily digest email",
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=vague_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="monitor social media",
)
assert isinstance(result, SuggestedGoalResponse)
assert result.goal_type == "vague"
assert result.suggested_goal == vague_result["suggested_goal"]
assert result.original_goal == "monitor social media"
assert result.reason == "The goal needs more specific details"
assert not isinstance(result, ErrorResponse)
@pytest.mark.asyncio
async def test_unachievable_goal_returns_suggested_goal_response(tool, session):
"""unachievable_goal decomposition result returns SuggestedGoalResponse, not ErrorResponse."""
unachievable_result = {
"type": "unachievable_goal",
"suggested_goal": "Summarize the latest news articles on a topic and send them by email",
"reason": "There are no blocks for mind-reading.",
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=unachievable_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="read my mind",
)
assert isinstance(result, SuggestedGoalResponse)
assert result.goal_type == "unachievable"
assert result.suggested_goal == unachievable_result["suggested_goal"]
assert result.original_goal == "read my mind"
assert result.reason == unachievable_result["reason"]
assert not isinstance(result, ErrorResponse)
@pytest.mark.asyncio
async def test_clarifying_questions_returns_clarification_needed_response(
tool, session
):
"""clarifying_questions decomposition result returns ClarificationNeededResponse."""
clarifying_result = {
"type": "clarifying_questions",
"questions": [
{
"question": "What platform should be monitored?",
"keyword": "platform",
"example": "Twitter, Reddit",
}
],
}
with (
patch(
"backend.copilot.tools.create_agent.get_all_relevant_agents_for_generation",
new_callable=AsyncMock,
return_value=[],
),
patch(
"backend.copilot.tools.create_agent.decompose_goal",
new_callable=AsyncMock,
return_value=clarifying_result,
),
):
result = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
description="monitor social media and alert me",
)
assert isinstance(result, ClarificationNeededResponse)
assert len(result.questions) == 1
assert result.questions[0].keyword == "platform"

View File

@@ -2,7 +2,7 @@
from datetime import datetime
from enum import Enum
from typing import Any, Literal
from typing import Any
from pydantic import BaseModel, Field
@@ -50,8 +50,6 @@ class ResponseType(str, Enum):
# Feature request types
FEATURE_REQUEST_SEARCH = "feature_request_search"
FEATURE_REQUEST_CREATED = "feature_request_created"
# Goal refinement
SUGGESTED_GOAL = "suggested_goal"
# Base response model
@@ -298,22 +296,6 @@ class ClarificationNeededResponse(ToolResponseBase):
questions: list[ClarifyingQuestion] = Field(default_factory=list)
class SuggestedGoalResponse(ToolResponseBase):
"""Response when the goal needs refinement with a suggested alternative."""
type: ResponseType = ResponseType.SUGGESTED_GOAL
suggested_goal: str = Field(description="The suggested alternative goal")
reason: str = Field(
default="", description="Why the original goal needs refinement"
)
original_goal: str = Field(
default="", description="The user's original goal for context"
)
goal_type: Literal["vague", "unachievable"] = Field(
default="vague", description="Type: 'vague' or 'unachievable'"
)
# Documentation search models
class DocSearchResult(BaseModel):
"""A single documentation search result."""

View File

@@ -212,6 +212,8 @@ class ReadWorkspaceFileTool(BaseTool):
"Specify either file_id or path to identify the file. "
"For small text files, returns content directly. "
"For large or binary files, returns metadata and a download URL. "
"Optionally use 'save_to_path' to copy the file to the ephemeral "
"working directory for processing with bash_exec or SDK tools. "
"Paths are scoped to the current session by default. "
"Use /sessions/<session_id>/... for cross-session access."
)
@@ -232,6 +234,15 @@ class ReadWorkspaceFileTool(BaseTool):
"Scoped to current session by default."
),
},
"save_to_path": {
"type": "string",
"description": (
"If provided, save the file to this path in the ephemeral "
"working directory (e.g., '/tmp/copilot-.../data.csv') "
"so it can be processed with bash_exec or SDK tools. "
"The file content is still returned in the response."
),
},
"force_download_url": {
"type": "boolean",
"description": (
@@ -275,6 +286,7 @@ class ReadWorkspaceFileTool(BaseTool):
file_id: Optional[str] = kwargs.get("file_id")
path: Optional[str] = kwargs.get("path")
save_to_path: Optional[str] = kwargs.get("save_to_path")
force_download_url: bool = kwargs.get("force_download_url", False)
if not file_id and not path:
@@ -283,6 +295,22 @@ class ReadWorkspaceFileTool(BaseTool):
session_id=session_id,
)
# Validate save_to_path is within ephemeral workspace
if save_to_path:
import os
from backend.copilot.tools.sandbox import WORKSPACE_PREFIX
real_save = os.path.realpath(save_to_path)
if not real_save.startswith(WORKSPACE_PREFIX):
return ErrorResponse(
message=(
f"save_to_path must be within the ephemeral working "
f"directory ({WORKSPACE_PREFIX})"
),
session_id=session_id,
)
try:
workspace = await workspace_db().get_or_create_workspace(user_id)
# Pass session_id for session-scoped file access
@@ -308,6 +336,15 @@ class ReadWorkspaceFileTool(BaseTool):
)
target_file_id = file_info.id
# If save_to_path requested, always read and save the file
if save_to_path:
import os
content = await manager.read_file_by_id(target_file_id)
os.makedirs(os.path.dirname(save_to_path), exist_ok=True)
with open(save_to_path, "wb") as f:
f.write(content)
# Decide whether to return inline content or metadata+URL
is_small_file = file_info.size_bytes <= self.MAX_INLINE_SIZE_BYTES
is_text_file = self._is_text_mime_type(file_info.mime_type)
@@ -327,13 +364,16 @@ class ReadWorkspaceFileTool(BaseTool):
content = await manager.read_file_by_id(target_file_id)
content_b64 = base64.b64encode(content).decode("utf-8")
msg = f"Successfully read file: {file_info.name}"
if save_to_path:
msg += f" (also saved to {save_to_path})"
return WorkspaceFileContentResponse(
file_id=file_info.id,
name=file_info.name,
path=file_info.path,
mime_type=file_info.mime_type,
content_base64=content_b64,
message=f"Successfully read file: {file_info.name}",
message=msg,
session_id=session_id,
)
@@ -356,6 +396,11 @@ class ReadWorkspaceFileTool(BaseTool):
except Exception:
pass # Preview is optional
msg = f"File: {file_info.name} ({file_info.size_bytes} bytes)."
if save_to_path:
msg += f" Saved to {save_to_path}."
else:
msg += " Use download_url to retrieve content."
return WorkspaceFileMetadataResponse(
file_id=file_info.id,
name=file_info.name,
@@ -364,7 +409,7 @@ class ReadWorkspaceFileTool(BaseTool):
size_bytes=file_info.size_bytes,
download_url=download_url,
preview=preview,
message=f"File: {file_info.name} ({file_info.size_bytes} bytes). Use download_url to retrieve content.",
message=msg,
session_id=session_id,
)
@@ -395,7 +440,9 @@ class WriteWorkspaceFileTool(BaseTool):
"Write or create a file in the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Write tool instead. "
"Provide the content as a base64-encoded string. "
"Provide content as plain text via 'content', OR base64-encoded via "
"'content_base64', OR copy a file from the ephemeral working directory "
"via 'source_path'. Exactly one of these three is required. "
f"Maximum file size is {Config().max_file_size_mb}MB. "
"Files are saved to the current session's folder by default. "
"Use /sessions/<session_id>/... for cross-session access."
@@ -410,9 +457,30 @@ class WriteWorkspaceFileTool(BaseTool):
"type": "string",
"description": "Name for the file (e.g., 'report.pdf')",
},
"content": {
"type": "string",
"description": (
"Plain text content to write. Use this for text files "
"(code, configs, documents, etc.). "
"Mutually exclusive with content_base64 and source_path."
),
},
"content_base64": {
"type": "string",
"description": "Base64-encoded file content",
"description": (
"Base64-encoded file content. Use this for binary files "
"(images, PDFs, etc.). "
"Mutually exclusive with content and source_path."
),
},
"source_path": {
"type": "string",
"description": (
"Path to a file in the ephemeral working directory to "
"copy to workspace (e.g., '/tmp/copilot-.../output.csv'). "
"Use this to persist files created by bash_exec or SDK Write. "
"Mutually exclusive with content and content_base64."
),
},
"path": {
"type": "string",
@@ -434,7 +502,7 @@ class WriteWorkspaceFileTool(BaseTool):
"description": "Whether to overwrite if file exists at path (default: false)",
},
},
"required": ["filename", "content_base64"],
"required": ["filename"],
}
@property
@@ -456,7 +524,9 @@ class WriteWorkspaceFileTool(BaseTool):
)
filename: str = kwargs.get("filename", "")
content_b64: str = kwargs.get("content_base64", "")
content_text: Optional[str] = kwargs.get("content")
content_b64: Optional[str] = kwargs.get("content_base64")
source_path: Optional[str] = kwargs.get("source_path")
path: Optional[str] = kwargs.get("path")
mime_type: Optional[str] = kwargs.get("mime_type")
overwrite: bool = kwargs.get("overwrite", False)
@@ -467,20 +537,66 @@ class WriteWorkspaceFileTool(BaseTool):
session_id=session_id,
)
if not content_b64:
# Resolve content from one of three sources
sources_provided = sum(
bool(x) for x in [content_text, content_b64, source_path]
)
if sources_provided == 0:
return ErrorResponse(
message="Please provide content_base64",
message="Please provide one of: content, content_base64, or source_path",
session_id=session_id,
)
if sources_provided > 1:
return ErrorResponse(
message="Provide only one of: content, content_base64, or source_path",
session_id=session_id,
)
# Decode content
try:
content = base64.b64decode(content_b64)
except Exception:
return ErrorResponse(
message="Invalid base64-encoded content",
session_id=session_id,
)
content: bytes
if source_path:
# Read from ephemeral working directory
import os
from backend.copilot.tools.sandbox import WORKSPACE_PREFIX
real_path = os.path.realpath(source_path)
if not real_path.startswith(WORKSPACE_PREFIX):
return ErrorResponse(
message=(
f"source_path must be within the ephemeral working "
f"directory ({WORKSPACE_PREFIX})"
),
session_id=session_id,
)
try:
with open(real_path, "rb") as f:
content = f.read()
except FileNotFoundError:
return ErrorResponse(
message=f"Source file not found: {source_path}",
session_id=session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to read source file: {e}",
session_id=session_id,
)
elif content_b64:
# Decode base64 content
try:
content = base64.b64decode(content_b64)
except Exception:
# Fallback: treat as plain text if base64 decode fails
# (LLMs sometimes send plain text in the content_base64 field)
logger.warning(
"[workspace] content_base64 is not valid base64, "
"treating as plain text"
)
content = content_b64.encode("utf-8")
else:
# Plain text content
assert content_text is not None
content = content_text.encode("utf-8")
# Check size
max_file_size = Config().max_file_size_mb * 1024 * 1024

View File

@@ -30,16 +30,6 @@ pnpm format
pnpm types
```
### Pre-completion Checks (MANDATORY)
After making **any** code changes in the frontend, you MUST run the following commands **in order** before reporting work as done, creating commits, or opening PRs:
1. `pnpm format` — auto-fix formatting issues
2. `pnpm lint` — check for lint errors; fix any that appear
3. `pnpm types` — check for type errors; fix any that appear
Do NOT skip these steps. If any command reports errors, fix them and re-run until clean. Only then may you consider the task complete. If typing keeps failing, stop and ask the user.
### Code Style
- Fully capitalize acronyms in symbols, e.g. `graphID`, `useBackendAPI`
@@ -84,4 +74,3 @@ See @CONTRIBUTING.md for complete patterns. Quick reference:
- Do not use `useCallback` or `useMemo` unless asked to optimise a given function
- Do not type hook returns, let Typescript infer as much as possible
- Never type with `any` unless a variable/attribute can ACTUALLY be of any type
- avoid index and barrel files

View File

@@ -169,10 +169,7 @@ export const ChatMessagesContainer = ({
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
{headerSlot}
{isLoading && messages.length === 0 && (
<div
className="flex flex-1 items-center justify-center"
style={{ minHeight: "calc(100vh - 12rem)" }}
>
<div className="flex min-h-full flex-1 items-center justify-center">
<LoadingSpinner className="text-neutral-600" />
</div>
)}

View File

@@ -13,7 +13,7 @@ export function MorphingTextAnimation({ text, className }: Props) {
<div className={cn(className)}>
<AnimatePresence mode="popLayout" initial={false}>
<motion.div key={text} className="whitespace-nowrap">
<motion.span className="inline-flex overflow-hidden">
<motion.span className="inline-flex gap-0 overflow-hidden">
{letters.map((char, index) => (
<motion.span
key={`${text}-${index}`}

View File

@@ -26,7 +26,6 @@ import {
} from "./components/ClarificationQuestionsCard";
import sparklesImg from "./components/MiniGame/assets/sparkles.png";
import { MiniGame } from "./components/MiniGame/MiniGame";
import { SuggestedGoalCard } from "./components/SuggestedGoalCard";
import {
AccordionIcon,
formatMaybeJson,
@@ -39,7 +38,6 @@ import {
isOperationInProgressOutput,
isOperationPendingOutput,
isOperationStartedOutput,
isSuggestedGoalOutput,
ToolIcon,
truncateText,
type CreateAgentToolOutput,
@@ -79,13 +77,6 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
expanded: true,
};
}
if (isSuggestedGoalOutput(output)) {
return {
icon,
title: "Goal needs refinement",
expanded: true,
};
}
if (
isOperationStartedOutput(output) ||
isOperationPendingOutput(output) ||
@@ -134,13 +125,8 @@ export function CreateAgentTool({ part }: Props) {
isAgentPreviewOutput(output) ||
isAgentSavedOutput(output) ||
isClarificationNeededOutput(output) ||
isSuggestedGoalOutput(output) ||
isErrorOutput(output));
function handleUseSuggestedGoal(goal: string) {
onSend(`Please create an agent with this goal: ${goal}`);
}
function handleClarificationAnswers(answers: Record<string, string>) {
const questions =
output && isClarificationNeededOutput(output)
@@ -259,16 +245,6 @@ export function CreateAgentTool({ part }: Props) {
/>
)}
{isSuggestedGoalOutput(output) && (
<SuggestedGoalCard
message={output.message}
suggestedGoal={output.suggested_goal}
reason={output.reason}
goalType={output.goal_type ?? "vague"}
onUseSuggestedGoal={handleUseSuggestedGoal}
/>
)}
{isErrorOutput(output) && (
<ContentGrid>
<ContentMessage>{output.message}</ContentMessage>
@@ -282,22 +258,6 @@ export function CreateAgentTool({ part }: Props) {
{formatMaybeJson(output.details)}
</ContentCodeBlock>
)}
<div className="flex gap-2">
<Button
variant="outline"
size="small"
onClick={() => onSend("Please try creating the agent again.")}
>
Try again
</Button>
<Button
variant="outline"
size="small"
onClick={() => onSend("Can you help me simplify this goal?")}
>
Simplify goal
</Button>
</div>
</ContentGrid>
)}
</ToolAccordion>

View File

@@ -10,10 +10,17 @@ export function MiniGame() {
const { canvasRef, activeMode, showOverlay, score, highScore, onContinue } =
useMiniGame();
const isRunActive =
activeMode === "run" || activeMode === "idle" || activeMode === "over";
let overlayText: string | undefined;
let buttonLabel = "Continue";
if (activeMode === "idle") {
buttonLabel = "Start";
} else if (activeMode === "boss-intro") {
overlayText = "Face the bandit!";
} else if (activeMode === "boss-defeated") {
overlayText = "Great job, keep on going";
} else if (activeMode === "over") {
overlayText = `Score: ${score} / Record: ${highScore}`;
buttonLabel = "Retry";
@@ -22,7 +29,16 @@ export function MiniGame() {
return (
<div className="flex flex-col gap-2">
<p className="text-sm font-medium text-purple-500">
<Key>WASD</Key> to move
{isRunActive ? (
<>
Run mode: <Key>Space</Key> to jump
</>
) : (
<>
Duel mode: <Key></Key> to move · <Key>Z</Key> to attack ·{" "}
<Key>X</Key> to block · <Key>Space</Key> to jump
</>
)}
</p>
<div className="relative w-full overflow-hidden rounded-md border border-accent bg-background text-foreground">
<canvas

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

View File

@@ -1,63 +0,0 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { ArrowRightIcon, LightbulbIcon } from "@phosphor-icons/react";
interface Props {
message: string;
suggestedGoal: string;
reason?: string;
goalType: string;
onUseSuggestedGoal: (goal: string) => void;
}
export function SuggestedGoalCard({
message,
suggestedGoal,
reason,
goalType,
onUseSuggestedGoal,
}: Props) {
return (
<div className="rounded-xl border border-amber-200 bg-amber-50/50 p-4">
<div className="flex items-start gap-3">
<LightbulbIcon
size={20}
weight="fill"
className="mt-0.5 text-amber-600"
/>
<div className="flex-1 space-y-3">
<div>
<Text variant="body-medium" className="font-medium text-slate-900">
{goalType === "unachievable"
? "Goal cannot be accomplished"
: "Goal needs more detail"}
</Text>
<Text variant="small" className="text-slate-600">
{reason || message}
</Text>
</div>
<div className="rounded-lg border border-amber-300 bg-white p-3">
<Text variant="small" className="mb-1 font-semibold text-amber-800">
Suggested alternative:
</Text>
<Text variant="body-medium" className="text-slate-900">
{suggestedGoal}
</Text>
</div>
<Button
onClick={() => onUseSuggestedGoal(suggestedGoal)}
variant="primary"
>
<span className="inline-flex items-center gap-1.5">
Use this goal <ArrowRightIcon size={14} weight="bold" />
</span>
</Button>
</div>
</div>
</div>
);
}

View File

@@ -6,7 +6,6 @@ import type { OperationInProgressResponse } from "@/app/api/__generated__/models
import type { OperationPendingResponse } from "@/app/api/__generated__/models/operationPendingResponse";
import type { OperationStartedResponse } from "@/app/api/__generated__/models/operationStartedResponse";
import { ResponseType } from "@/app/api/__generated__/models/responseType";
import type { SuggestedGoalResponse } from "@/app/api/__generated__/models/suggestedGoalResponse";
import {
PlusCircleIcon,
PlusIcon,
@@ -22,7 +21,6 @@ export type CreateAgentToolOutput =
| AgentPreviewResponse
| AgentSavedResponse
| ClarificationNeededResponse
| SuggestedGoalResponse
| ErrorResponse;
function parseOutput(output: unknown): CreateAgentToolOutput | null {
@@ -45,7 +43,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
type === ResponseType.agent_preview ||
type === ResponseType.agent_saved ||
type === ResponseType.clarification_needed ||
type === ResponseType.suggested_goal ||
type === ResponseType.error
) {
return output as CreateAgentToolOutput;
@@ -58,7 +55,6 @@ function parseOutput(output: unknown): CreateAgentToolOutput | null {
if ("agent_id" in output && "library_agent_id" in output)
return output as AgentSavedResponse;
if ("questions" in output) return output as ClarificationNeededResponse;
if ("suggested_goal" in output) return output as SuggestedGoalResponse;
if ("error" in output || "details" in output)
return output as ErrorResponse;
}
@@ -118,14 +114,6 @@ export function isClarificationNeededOutput(
);
}
export function isSuggestedGoalOutput(
output: CreateAgentToolOutput,
): output is SuggestedGoalResponse {
return (
output.type === ResponseType.suggested_goal || "suggested_goal" in output
);
}
export function isErrorOutput(
output: CreateAgentToolOutput,
): output is ErrorResponse {
@@ -151,7 +139,6 @@ export function getAnimationText(part: {
if (isAgentSavedOutput(output)) return `Saved ${output.agent_name}`;
if (isAgentPreviewOutput(output)) return `Preview "${output.agent_name}"`;
if (isClarificationNeededOutput(output)) return "Needs clarification";
if (isSuggestedGoalOutput(output)) return "Goal needs refinement";
return "Error creating agent";
}
case "output-error":

View File

@@ -1,6 +1,5 @@
import {
getGetV2ListSessionsQueryKey,
postV2CancelSessionTask,
useDeleteV2DeleteSession,
useGetV2ListSessions,
} from "@/app/api/__generated__/endpoints/chat/chat";
@@ -9,7 +8,6 @@ import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useChat } from "@ai-sdk/react";
import { useQueryClient } from "@tanstack/react-query";
import type { UIMessage } from "ai";
import { DefaultChatTransport } from "ai";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useChatSession } from "./useChatSession";
@@ -17,24 +15,6 @@ import { useLongRunningToolPolling } from "./hooks/useLongRunningToolPolling";
const STREAM_START_TIMEOUT_MS = 12_000;
/** Mark any in-progress tool parts as completed/errored so spinners stop. */
function resolveInProgressTools(
messages: UIMessage[],
outcome: "completed" | "cancelled",
): UIMessage[] {
return messages.map((msg) => ({
...msg,
parts: msg.parts.map((part) =>
"state" in part &&
(part.state === "input-streaming" || part.state === "input-available")
? outcome === "cancelled"
? { ...part, state: "output-error" as const, errorText: "Cancelled" }
: { ...part, state: "output-available" as const, output: "" }
: part,
),
}));
}
export function useCopilotPage() {
const { isUserLoading, isLoggedIn } = useSupabase();
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
@@ -115,7 +95,7 @@ export function useCopilotPage() {
const {
messages,
sendMessage,
stop: sdkStop,
stop,
status,
error,
setMessages,
@@ -128,36 +108,6 @@ export function useCopilotPage() {
// call resumeStream() manually after hydration + active_stream detection.
});
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
async function stop() {
sdkStop();
setMessages((prev) => resolveInProgressTools(prev, "cancelled"));
if (!sessionId) return;
try {
const res = await postV2CancelSessionTask(sessionId);
if (
res.status === 200 &&
"reason" in res.data &&
res.data.reason === "cancel_published_not_confirmed"
) {
toast({
title: "Stop may take a moment",
description:
"The cancel was sent but not yet confirmed. The task should stop shortly.",
});
}
} catch {
toast({
title: "Could not stop the task",
description: "The task may still be running in the background.",
variant: "destructive",
});
}
}
// Abort the stream if the backend doesn't start sending data within 12s.
const stopRef = useRef(stop);
stopRef.current = stop;
@@ -202,18 +152,6 @@ export function useCopilotPage() {
resumeStream();
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
// When the stream finishes, resolve any tool parts still showing spinners.
// This can happen if the backend didn't emit StreamToolOutputAvailable for
// a tool call before sending StreamFinish (e.g. SDK built-in tools).
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
if (prev === "streaming" && status === "ready") {
setMessages((msgs) => resolveInProgressTools(msgs, "completed"));
}
}, [status, setMessages]);
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
// is in progress. When the backend completes, the session data will contain
// the final tool output — this hook detects the change and updates messages.

View File

@@ -1052,7 +1052,6 @@
{
"$ref": "#/components/schemas/ClarificationNeededResponse"
},
{ "$ref": "#/components/schemas/SuggestedGoalResponse" },
{ "$ref": "#/components/schemas/BlockListResponse" },
{ "$ref": "#/components/schemas/BlockDetailsResponse" },
{ "$ref": "#/components/schemas/BlockOutputResponse" },
@@ -1263,44 +1262,6 @@
}
}
},
"/api/chat/sessions/{session_id}/cancel": {
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Cancel Session Task",
"description": "Cancel the active streaming task for a session.\n\nPublishes a cancel event to the executor via RabbitMQ FANOUT, then\npolls Redis until the task status flips from ``running`` or a timeout\n(5 s) is reached. Returns only after the cancellation is confirmed.",
"operationId": "postV2CancelSessionTask",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/CancelTaskResponse" }
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/chat/sessions/{session_id}/stream": {
"get": {
"tags": ["v2", "chat", "chat"],
@@ -7575,23 +7536,6 @@
"required": ["file"],
"title": "Body_postV2Upload submission media"
},
"CancelTaskResponse": {
"properties": {
"cancelled": { "type": "boolean", "title": "Cancelled" },
"task_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Task Id"
},
"reason": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Reason"
}
},
"type": "object",
"required": ["cancelled"],
"title": "CancelTaskResponse",
"description": "Response model for the cancel task endpoint."
},
"ChangelogEntry": {
"properties": {
"version": { "type": "string", "title": "Version" },
@@ -10852,8 +10796,7 @@
"bash_exec",
"operation_status",
"feature_request_search",
"feature_request_created",
"suggested_goal"
"feature_request_created"
],
"title": "ResponseType",
"description": "Types of tool responses."
@@ -11734,47 +11677,6 @@
"enum": ["DRAFT", "PENDING", "APPROVED", "REJECTED"],
"title": "SubmissionStatus"
},
"SuggestedGoalResponse": {
"properties": {
"type": {
"$ref": "#/components/schemas/ResponseType",
"default": "suggested_goal"
},
"message": { "type": "string", "title": "Message" },
"session_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Session Id"
},
"suggested_goal": {
"type": "string",
"title": "Suggested Goal",
"description": "The suggested alternative goal"
},
"reason": {
"type": "string",
"title": "Reason",
"description": "Why the original goal needs refinement",
"default": ""
},
"original_goal": {
"type": "string",
"title": "Original Goal",
"description": "The user's original goal for context",
"default": ""
},
"goal_type": {
"type": "string",
"enum": ["vague", "unachievable"],
"title": "Goal Type",
"description": "Type: 'vague' or 'unachievable'",
"default": "vague"
}
},
"type": "object",
"required": ["message", "suggested_goal"],
"title": "SuggestedGoalResponse",
"description": "Response when the goal needs refinement with a suggested alternative."
},
"SuggestionsResponse": {
"properties": {
"otto_suggestions": {

View File

@@ -69,11 +69,12 @@ test.describe("Marketplace Creator Page Basic Functionality", () => {
await marketplacePage.getFirstCreatorProfile(page);
await firstCreatorProfile.click();
await page.waitForURL("**/marketplace/creator/**");
await page.waitForLoadState("networkidle").catch(() => {});
const firstAgent = page
.locator('[data-testid="store-card"]:visible')
.first();
await firstAgent.waitFor({ state: "visible", timeout: 15000 });
await firstAgent.waitFor({ state: "visible", timeout: 30000 });
await firstAgent.click();
await page.waitForURL("**/marketplace/agent/**");

View File

@@ -115,11 +115,18 @@ test.describe("Marketplace Basic Functionality", () => {
const searchTerm = page.getByText("DummyInput").first();
await isVisible(searchTerm);
await expect
.poll(() => marketplacePage.getSearchResultsCount(page), {
timeout: 15000,
})
.toBeGreaterThan(0);
await page.waitForLoadState("networkidle").catch(() => {});
await page
.waitForFunction(
() =>
document.querySelectorAll('[data-testid="store-card"]').length > 0,
{ timeout: 15000 },
)
.catch(() => console.log("No search results appeared within timeout"));
const results = await marketplacePage.getSearchResultsCount(page);
expect(results).toBeGreaterThan(0);
console.log("Complete search flow works correctly test passed ✅");
});
@@ -128,9 +135,7 @@ test.describe("Marketplace Basic Functionality", () => {
});
test.describe("Marketplace Edge Cases", () => {
test("Search for non-existent item renders search page correctly", async ({
page,
}) => {
test("Search for non-existent item shows no results", async ({ page }) => {
const marketplacePage = new MarketplacePage(page);
await marketplacePage.goto(page);
@@ -146,23 +151,9 @@ test.describe("Marketplace Edge Cases", () => {
const searchTerm = page.getByText("xyznonexistentitemxyz123");
await isVisible(searchTerm);
// The search page should render either results or a "No results found" message
await expect
.poll(
async () => {
const hasResults =
(await page.locator('[data-testid="store-card"]').count()) > 0;
const hasNoResultsMsg = await page
.getByText("No results found")
.isVisible();
return hasResults || hasNoResultsMsg;
},
{ timeout: 15000 },
)
.toBe(true);
const results = await marketplacePage.getSearchResultsCount(page);
expect(results).toBe(0);
console.log(
"Search for non-existent item renders search page correctly test passed ✅",
);
console.log("Search for non-existent item shows no results test passed ✅");
});
});

View File

@@ -125,8 +125,16 @@ export class BuildPage extends BasePage {
`[data-id="block-card-${blockCardId}"]`,
);
await blockCard.waitFor({ state: "visible", timeout: 10000 });
await blockCard.click();
try {
// Wait for the block card to be visible with a reasonable timeout
await blockCard.waitFor({ state: "visible", timeout: 10000 });
await blockCard.click();
} catch (error) {
console.log(
`Block ${block.name} (display: ${displayName}) returned from the API but not found in block list`,
);
console.log(`Error: ${error}`);
}
}
async hasBlock(_block: Block) {

View File

@@ -65,7 +65,7 @@ export class LoginPage {
await this.page.waitForLoadState("load", { timeout: 10_000 });
console.log("➡️ Navigating to /marketplace ...");
await this.page.goto("/marketplace", { timeout: 20_000 });
await this.page.goto("/marketplace", { timeout: 10_000 });
console.log("✅ Login process complete");
// If Wallet popover auto-opens, close it to avoid blocking account menu interactions

View File

@@ -9,12 +9,7 @@ export class MarketplacePage extends BasePage {
async goto(page: Page) {
await page.goto("/marketplace");
await page
.locator(
'[data-testid="store-card"], [data-testid="featured-store-card"]',
)
.first()
.waitFor({ state: "visible", timeout: 20000 });
await page.waitForLoadState("networkidle").catch(() => {});
}
async getMarketplaceTitle(page: Page) {
@@ -116,7 +111,7 @@ export class MarketplacePage extends BasePage {
async getFirstFeaturedAgent(page: Page) {
const { getId } = getSelectors(page);
const card = getId("featured-store-card").first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}
@@ -124,14 +119,14 @@ export class MarketplacePage extends BasePage {
const card = this.page
.locator('[data-testid="store-card"]:visible')
.first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}
async getFirstCreatorProfile(page: Page) {
const { getId } = getSelectors(page);
const card = getId("creator-card").first();
await card.waitFor({ state: "visible", timeout: 15000 });
await card.waitFor({ state: "visible", timeout: 30000 });
return card;
}

View File

@@ -45,9 +45,8 @@ export async function isEnabled(el: Locator) {
}
export async function hasMinCount(el: Locator, minCount: number) {
await expect
.poll(async () => await el.count(), { timeout: 10000 })
.toBeGreaterThanOrEqual(minCount);
const count = await el.count();
expect(count).toBeGreaterThanOrEqual(minCount);
}
export async function matchesUrl(page: Page, pattern: RegExp) {