switch to vercel ai sdk protocol

This commit is contained in:
Swifty
2026-01-13 14:27:08 +01:00
parent 8fa972d26e
commit 5e6d5f97e5
9 changed files with 240 additions and 177 deletions

View File

@@ -1,3 +1,10 @@
"""
Response models for Vercel AI SDK UI Stream Protocol.
This module implements the AI SDK UI Stream Protocol (v1) for streaming chat responses.
See: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol
"""
from enum import Enum
from typing import Any
@@ -5,97 +12,133 @@ from pydantic import BaseModel, Field
class ResponseType(str, Enum):
"""Types of streaming responses."""
"""Types of streaming responses following AI SDK protocol."""
TEXT_CHUNK = "text_chunk"
TEXT_ENDED = "text_ended"
TOOL_CALL = "tool_call"
TOOL_CALL_START = "tool_call_start"
TOOL_RESPONSE = "tool_response"
# Message lifecycle
START = "start"
FINISH = "finish"
# Text streaming
TEXT_START = "text-start"
TEXT_DELTA = "text-delta"
TEXT_END = "text-end"
# Tool interaction
TOOL_INPUT_START = "tool-input-start"
TOOL_INPUT_AVAILABLE = "tool-input-available"
TOOL_OUTPUT_AVAILABLE = "tool-output-available"
# Other
ERROR = "error"
USAGE = "usage"
STREAM_END = "stream_end"
class StreamBaseResponse(BaseModel):
"""Base response model for all streaming responses."""
type: ResponseType
timestamp: str | None = None
def to_sse(self) -> str:
"""Convert to SSE format."""
return f"data: {self.model_dump_json()}\n\n"
class StreamTextChunk(StreamBaseResponse):
"""Streaming text content from the assistant."""
type: ResponseType = ResponseType.TEXT_CHUNK
content: str = Field(..., description="Text content chunk")
# ========== Message Lifecycle ==========
class StreamToolCallStart(StreamBaseResponse):
class StreamStart(StreamBaseResponse):
"""Start of a new message."""
type: ResponseType = ResponseType.START
messageId: str = Field(..., description="Unique message ID")
class StreamFinish(StreamBaseResponse):
"""End of message/stream."""
type: ResponseType = ResponseType.FINISH
# ========== Text Streaming ==========
class StreamTextStart(StreamBaseResponse):
"""Start of a text block."""
type: ResponseType = ResponseType.TEXT_START
id: str = Field(..., description="Text block ID")
class StreamTextDelta(StreamBaseResponse):
"""Streaming text content delta."""
type: ResponseType = ResponseType.TEXT_DELTA
id: str = Field(..., description="Text block ID")
delta: str = Field(..., description="Text content delta")
class StreamTextEnd(StreamBaseResponse):
"""End of a text block."""
type: ResponseType = ResponseType.TEXT_END
id: str = Field(..., description="Text block ID")
# ========== Tool Interaction ==========
class StreamToolInputStart(StreamBaseResponse):
"""Tool call started notification."""
type: ResponseType = ResponseType.TOOL_CALL_START
tool_name: str = Field(..., description="Name of the tool that was executed")
tool_id: str = Field(..., description="Unique tool call ID")
type: ResponseType = ResponseType.TOOL_INPUT_START
toolCallId: str = Field(..., description="Unique tool call ID")
toolName: str = Field(..., description="Name of the tool being called")
class StreamToolCall(StreamBaseResponse):
"""Tool invocation notification."""
class StreamToolInputAvailable(StreamBaseResponse):
"""Tool input is ready for execution."""
type: ResponseType = ResponseType.TOOL_CALL
tool_id: str = Field(..., description="Unique tool call ID")
tool_name: str = Field(..., description="Name of the tool being called")
arguments: dict[str, Any] = Field(
default_factory=dict, description="Tool arguments"
type: ResponseType = ResponseType.TOOL_INPUT_AVAILABLE
toolCallId: str = Field(..., description="Unique tool call ID")
toolName: str = Field(..., description="Name of the tool being called")
input: dict[str, Any] = Field(
default_factory=dict, description="Tool input arguments"
)
class StreamToolExecutionResult(StreamBaseResponse):
class StreamToolOutputAvailable(StreamBaseResponse):
"""Tool execution result."""
type: ResponseType = ResponseType.TOOL_RESPONSE
tool_id: str = Field(..., description="Tool call ID this responds to")
tool_name: str = Field(..., description="Name of the tool that was executed")
result: str | dict[str, Any] = Field(..., description="Tool execution result")
type: ResponseType = ResponseType.TOOL_OUTPUT_AVAILABLE
toolCallId: str = Field(..., description="Tool call ID this responds to")
output: str | dict[str, Any] = Field(..., description="Tool execution output")
# Additional fields for internal use (not part of AI SDK spec but useful)
toolName: str | None = Field(
default=None, description="Name of the tool that was executed"
)
success: bool = Field(
default=True, description="Whether the tool execution succeeded"
)
# ========== Other ==========
class StreamUsage(StreamBaseResponse):
"""Token usage statistics."""
type: ResponseType = ResponseType.USAGE
prompt_tokens: int
completion_tokens: int
total_tokens: int
promptTokens: int = Field(..., description="Number of prompt tokens")
completionTokens: int = Field(..., description="Number of completion tokens")
totalTokens: int = Field(..., description="Total number of tokens")
class StreamError(StreamBaseResponse):
"""Error response."""
type: ResponseType = ResponseType.ERROR
message: str = Field(..., description="Error message")
errorText: str = Field(..., description="Error message text")
code: str | None = Field(default=None, description="Error code")
details: dict[str, Any] | None = Field(
default=None, description="Additional error details"
)
class StreamTextEnded(StreamBaseResponse):
"""Text streaming completed marker."""
type: ResponseType = ResponseType.TEXT_ENDED
class StreamEnd(StreamBaseResponse):
"""End of stream marker."""
type: ResponseType = ResponseType.STREAM_END
summary: dict[str, Any] | None = Field(
default=None, description="Stream summary statistics"
)

View File

@@ -225,6 +225,8 @@ async def stream_chat_post(
context=request.context,
):
yield chunk.to_sse()
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
@@ -233,6 +235,7 @@ async def stream_chat_post(
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
)
@@ -281,6 +284,8 @@ async def stream_chat_get(
session=session, # Pass pre-fetched session to avoid double-fetch
):
yield chunk.to_sse()
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
@@ -289,6 +294,7 @@ async def stream_chat_get(
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
)

View File

@@ -1,6 +1,5 @@
import logging
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from typing import Any
import orjson
@@ -22,13 +21,15 @@ from .model import create_chat_session as model_create_chat_session
from .model import get_chat_session, update_session_title, upsert_chat_session
from .response_model import (
StreamBaseResponse,
StreamEnd,
StreamError,
StreamTextChunk,
StreamTextEnded,
StreamToolCall,
StreamToolCallStart,
StreamToolExecutionResult,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from .tools import execute_tool, tools
@@ -377,6 +378,15 @@ async def stream_chat_completion(
accumulated_tool_calls: list[dict[str, Any]] = []
should_retry = False
# Generate unique IDs for AI SDK protocol
import uuid as uuid_module
message_id = str(uuid_module.uuid4())
text_block_id = str(uuid_module.uuid4())
# Yield message start
yield StreamStart(messageId=message_id)
# Create Langfuse generation for each LLM call, linked to the prompt
# Using v3 SDK: start_observation with as_type="generation"
generation = (
@@ -396,53 +406,63 @@ async def stream_chat_completion(
session=session,
tools=tools,
system_prompt=system_prompt,
text_block_id=text_block_id,
):
if isinstance(chunk, StreamTextChunk):
content = chunk.content or ""
if isinstance(chunk, StreamTextStart):
# Emit text-start before first text delta
if not has_received_text:
yield chunk
elif isinstance(chunk, StreamTextDelta):
delta = chunk.delta or ""
assert assistant_response.content is not None
assistant_response.content += content
assistant_response.content += delta
has_received_text = True
yield chunk
elif isinstance(chunk, StreamToolCallStart):
# Emit text_ended before first tool call, but only if we've received text
elif isinstance(chunk, StreamTextEnd):
# Emit text-end after text completes
if has_received_text and not text_streaming_ended:
yield StreamTextEnded()
text_streaming_ended = True
yield chunk
elif isinstance(chunk, StreamToolInputStart):
# Emit text-end before first tool call, but only if we've received text
if has_received_text and not text_streaming_ended:
yield StreamTextEnd(id=text_block_id)
text_streaming_ended = True
yield chunk
elif isinstance(chunk, StreamToolCall):
elif isinstance(chunk, StreamToolInputAvailable):
# Accumulate tool calls in OpenAI format
accumulated_tool_calls.append(
{
"id": chunk.tool_id,
"id": chunk.toolCallId,
"type": "function",
"function": {
"name": chunk.tool_name,
"arguments": orjson.dumps(chunk.arguments).decode("utf-8"),
"name": chunk.toolName,
"arguments": orjson.dumps(chunk.input).decode("utf-8"),
},
}
)
elif isinstance(chunk, StreamToolExecutionResult):
elif isinstance(chunk, StreamToolOutputAvailable):
result_content = (
chunk.result
if isinstance(chunk.result, str)
else orjson.dumps(chunk.result).decode("utf-8")
chunk.output
if isinstance(chunk.output, str)
else orjson.dumps(chunk.output).decode("utf-8")
)
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.tool_id,
tool_call_id=chunk.toolCallId,
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
logger.warning(
f"Tool {chunk.tool_name} (ID: {chunk.tool_id}) execution failed"
f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed"
)
yield chunk
elif isinstance(chunk, StreamEnd):
elif isinstance(chunk, StreamFinish):
if not has_done_tool_call:
has_yielded_end = True
yield chunk
@@ -451,9 +471,9 @@ async def stream_chat_completion(
elif isinstance(chunk, StreamUsage):
session.usage.append(
Usage(
prompt_tokens=chunk.prompt_tokens,
completion_tokens=chunk.completion_tokens,
total_tokens=chunk.total_tokens,
prompt_tokens=chunk.promptTokens,
completion_tokens=chunk.completionTokens,
total_tokens=chunk.totalTokens,
)
)
else:
@@ -495,15 +515,10 @@ async def stream_chat_completion(
f"Max retries ({config.max_retries}) exceeded: {error_message}"
)
error_response = StreamError(
message=error_message,
timestamp=datetime.now(UTC).isoformat(),
)
error_response = StreamError(errorText=error_message)
yield error_response
if not has_yielded_end:
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
yield StreamFinish()
return
# Handle retry outside of exception handler to avoid nesting
@@ -599,6 +614,7 @@ async def _stream_chat_chunks(
session: ChatSession,
tools: list[ChatCompletionToolParam],
system_prompt: str | None = None,
text_block_id: str | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Pure streaming function for OpenAI chat completions with tool calling.
@@ -651,14 +667,17 @@ async def _stream_chat_chunks(
# Track which tool call indices have had their start event emitted
emitted_start_for_idx: set[int] = set()
# Track if we've started the text block
text_started = False
# Process the stream
chunk: ChatCompletionChunk
async for chunk in stream:
if chunk.usage:
yield StreamUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
promptTokens=chunk.usage.prompt_tokens,
completionTokens=chunk.usage.completion_tokens,
totalTokens=chunk.usage.total_tokens,
)
if chunk.choices:
@@ -672,10 +691,14 @@ async def _stream_chat_chunks(
# Handle content streaming
if delta.content:
# Stream the text chunk
text_response = StreamTextChunk(
content=delta.content,
timestamp=datetime.now(UTC).isoformat(),
# Emit text-start on first text content
if not text_started and text_block_id:
yield StreamTextStart(id=text_block_id)
text_started = True
# Stream the text delta
text_response = StreamTextDelta(
id=text_block_id or "",
delta=delta.content,
)
yield text_response
@@ -717,16 +740,15 @@ async def _stream_chat_chunks(
"arguments"
] += tc_chunk.function.arguments
# Emit StreamToolCallStart only after we have the tool call ID
# Emit StreamToolInputStart only after we have the tool call ID
if (
idx not in emitted_start_for_idx
and tool_calls[idx]["id"]
and tool_calls[idx]["function"]["name"]
):
yield StreamToolCallStart(
tool_id=tool_calls[idx]["id"],
tool_name=tool_calls[idx]["function"]["name"],
timestamp=datetime.now(UTC).isoformat(),
yield StreamToolInputStart(
toolCallId=tool_calls[idx]["id"],
toolName=tool_calls[idx]["function"]["name"],
)
emitted_start_for_idx.add(idx)
logger.info(f"Stream complete. Finish reason: {finish_reason}")
@@ -744,26 +766,18 @@ async def _stream_chat_chunks(
extra={"tool_call": tool_call},
)
yield StreamError(
message=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
timestamp=datetime.now(UTC).isoformat(),
errorText=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
)
# Re-raise to trigger retry logic in the parent function
raise
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
yield StreamFinish()
return
except Exception as e:
logger.error(f"Error in stream: {e!s}", exc_info=True)
error_response = StreamError(
message=str(e),
timestamp=datetime.now(UTC).isoformat(),
)
error_response = StreamError(errorText=str(e))
yield error_response
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
yield StreamFinish()
return
@@ -781,6 +795,7 @@ async def _yield_tool_call(
TypeError: If tool call structure is invalid
"""
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - handle empty arguments gracefully
@@ -790,17 +805,16 @@ async def _yield_tool_call(
else:
arguments = {}
yield StreamToolCall(
tool_id=tool_calls[yield_idx]["id"],
tool_name=tool_name,
arguments=arguments,
timestamp=datetime.now(UTC).isoformat(),
yield StreamToolInputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
input=arguments,
)
tool_execution_response: StreamToolExecutionResult = await execute_tool(
tool_execution_response: StreamToolOutputAvailable = await execute_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_calls[yield_idx]["id"],
tool_call_id=tool_call_id,
user_id=session.user_id,
session=session,
)

View File

@@ -5,10 +5,10 @@ import pytest
from . import service as chat_service
from .response_model import (
StreamEnd,
StreamError,
StreamTextChunk,
StreamToolExecutionResult,
StreamFinish,
StreamTextDelta,
StreamToolOutputAvailable,
)
logger = logging.getLogger(__name__)
@@ -34,9 +34,9 @@ async def test_stream_chat_completion():
logger.info(chunk)
if isinstance(chunk, StreamError):
has_errors = True
if isinstance(chunk, StreamTextChunk):
assistant_message += chunk.content
if isinstance(chunk, StreamEnd):
if isinstance(chunk, StreamTextDelta):
assistant_message += chunk.delta
if isinstance(chunk, StreamFinish):
has_ended = True
assert has_ended, "Chat completion did not end"
@@ -68,9 +68,9 @@ async def test_stream_chat_completion_with_tool_calls():
if isinstance(chunk, StreamError):
has_errors = True
if isinstance(chunk, StreamEnd):
if isinstance(chunk, StreamFinish):
has_ended = True
if isinstance(chunk, StreamToolExecutionResult):
if isinstance(chunk, StreamToolOutputAvailable):
had_tool_calls = True
assert has_ended, "Chat completion did not end"

View File

@@ -12,7 +12,7 @@ from .find_library_agent import FindLibraryAgentTool
from .run_agent import RunAgentTool
if TYPE_CHECKING:
from backend.api.features.chat.response_model import StreamToolExecutionResult
from backend.api.features.chat.response_model import StreamToolOutputAvailable
# Initialize tool instances
add_understanding_tool = AddUnderstandingTool()
@@ -37,7 +37,7 @@ async def execute_tool(
user_id: str | None,
session: ChatSession,
tool_call_id: str,
) -> "StreamToolExecutionResult":
) -> "StreamToolOutputAvailable":
tool_map: dict[str, BaseTool] = {
"add_understanding": add_understanding_tool,

View File

@@ -8,8 +8,8 @@ from backend.api.features.store import db as store_db
from backend.util.exceptions import DatabaseError, NotFoundError
from .models import (
AgentCarouselResponse,
AgentInfo,
AgentsFoundResponse,
ErrorResponse,
NoResultsResponse,
ToolResponseBase,
@@ -36,7 +36,7 @@ async def search_agents(
user_id: User ID (required for library search)
Returns:
AgentCarouselResponse, NoResultsResponse, or ErrorResponse
AgentsFoundResponse, NoResultsResponse, or ErrorResponse
"""
if not query:
return ErrorResponse(
@@ -142,7 +142,7 @@ async def search_agents(
"/library/agents/{agent_id}. Use agent_output to get execution results, or run_agent to execute."
)
return AgentCarouselResponse(
return AgentsFoundResponse(
message=message,
title=title,
agents=agents,

View File

@@ -6,7 +6,7 @@ from typing import Any
from openai.types.chat import ChatCompletionToolParam
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.response_model import StreamToolExecutionResult
from backend.api.features.chat.response_model import StreamToolOutputAvailable
from .models import ErrorResponse, NeedLoginResponse, ToolResponseBase
@@ -53,7 +53,7 @@ class BaseTool:
session: ChatSession,
tool_call_id: str,
**kwargs,
) -> StreamToolExecutionResult:
) -> StreamToolOutputAvailable:
"""Execute the tool with authentication check.
Args:
@@ -69,10 +69,10 @@ class BaseTool:
logger.error(
f"Attempted tool call for {self.name} but user not authenticated"
)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=NeedLoginResponse(
return StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=self.name,
output=NeedLoginResponse(
message=f"Please sign in to use {self.name}",
session_id=session.session_id,
).model_dump_json(),
@@ -81,17 +81,17 @@ class BaseTool:
try:
result = await self._execute(user_id, session, **kwargs)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=result.model_dump_json(),
return StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=self.name,
output=result.model_dump_json(),
)
except Exception as e:
logger.error(f"Error in {self.name}: {e}", exc_info=True)
return StreamToolExecutionResult(
tool_id=tool_call_id,
tool_name=self.name,
result=ErrorResponse(
return StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=self.name,
output=ErrorResponse(
message=f"An error occurred while executing {self.name}",
error=str(e),
session_id=session.session_id,

View File

@@ -12,7 +12,7 @@ from backend.data.model import CredentialsMetaInput
class ResponseType(str, Enum):
"""Types of tool responses."""
AGENT_CAROUSEL = "agent_carousel"
AGENTS_FOUND = "agents_found"
AGENT_DETAILS = "agent_details"
SETUP_REQUIREMENTS = "setup_requirements"
EXECUTION_STARTED = "execution_started"
@@ -53,14 +53,14 @@ class AgentInfo(BaseModel):
graph_id: str | None = None
class AgentCarouselResponse(ToolResponseBase):
class AgentsFoundResponse(ToolResponseBase):
"""Response for find_agent tool."""
type: ResponseType = ResponseType.AGENT_CAROUSEL
type: ResponseType = ResponseType.AGENTS_FOUND
title: str = "Available Agents"
agents: list[AgentInfo]
count: int
name: str = "agent_carousel"
name: str = "agents_found"
class NoResultsResponse(ToolResponseBase):

View File

@@ -46,11 +46,11 @@ async def test_run_agent(setup_test_data):
# Verify the response
assert response is not None
assert hasattr(response, "result")
assert hasattr(response, "output")
# Parse the result JSON to verify the execution started
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
assert "execution_id" in result_data
assert "graph_id" in result_data
assert result_data["graph_id"] == graph.id
@@ -86,11 +86,11 @@ async def test_run_agent_missing_inputs(setup_test_data):
# Verify that we get an error response
assert response is not None
assert hasattr(response, "result")
assert hasattr(response, "output")
# The tool should return an ErrorResponse when setup info indicates not ready
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
assert "message" in result_data
@@ -118,10 +118,10 @@ async def test_run_agent_invalid_agent_id(setup_test_data):
# Verify that we get an error response
assert response is not None
assert hasattr(response, "result")
assert hasattr(response, "output")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
assert "message" in result_data
# Should get an error about failed setup or not found
assert any(
@@ -158,12 +158,12 @@ async def test_run_agent_with_llm_credentials(setup_llm_test_data):
# Verify the response
assert response is not None
assert hasattr(response, "result")
assert hasattr(response, "output")
# Parse the result JSON to verify the execution started
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should successfully start execution since credentials are available
assert "execution_id" in result_data
@@ -195,9 +195,9 @@ async def test_run_agent_shows_available_inputs_when_none_provided(setup_test_da
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should return agent_details type showing available inputs
assert result_data.get("type") == "agent_details"
@@ -230,9 +230,9 @@ async def test_run_agent_with_use_defaults(setup_test_data):
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should execute successfully
assert "execution_id" in result_data
@@ -260,9 +260,9 @@ async def test_run_agent_missing_credentials(setup_firecrawl_test_data):
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should return setup_requirements type with missing credentials
assert result_data.get("type") == "setup_requirements"
@@ -292,9 +292,9 @@ async def test_run_agent_invalid_slug_format(setup_test_data):
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should return error
assert result_data.get("type") == "error"
@@ -318,9 +318,9 @@ async def test_run_agent_unauthenticated():
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Base tool returns need_login type for unauthenticated users
assert result_data.get("type") == "need_login"
@@ -350,9 +350,9 @@ async def test_run_agent_schedule_without_cron(setup_test_data):
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should return error about missing cron
assert result_data.get("type") == "error"
@@ -382,9 +382,9 @@ async def test_run_agent_schedule_without_name(setup_test_data):
)
assert response is not None
assert hasattr(response, "result")
assert isinstance(response.result, str)
result_data = orjson.loads(response.result)
assert hasattr(response, "output")
assert isinstance(response.output, str)
result_data = orjson.loads(response.output)
# Should return error about missing schedule_name
assert result_data.get("type") == "error"