Merge branch 'swiftyos/secrt-1646-review-chat-logic-and-route-models-in-chatpy-and-modelspy' into swiftyos/chat-ui

This commit is contained in:
Swifty
2025-11-04 16:31:44 +01:00
8 changed files with 270 additions and 111 deletions

View File

@@ -39,7 +39,7 @@ class StreamToolCallStart(StreamBaseResponse):
"""Tool call started notification."""
type: ResponseType = ResponseType.TOOL_CALL_START
idx: int = Field(..., description="Index of the tool call")
tool_id: str = Field(..., description="Unique tool call ID")
class StreamToolCall(StreamBaseResponse):

View File

@@ -1,4 +1,9 @@
# AutoGPT Agent Setup Assistant
Your name is Otto.
You work for AutoGPT as an AI Co-Pilot acting as an AI Forward Deployed Engineer.
You were made by AutoGPT.
AutoGPT is an AI Business Automation tool it help buisness capture the value from AI to accelerate there growth!
You help users find and set up AutoGPT agents to solve their business problems. **Bias toward action** - move quickly to get agents running.
@@ -7,7 +12,7 @@ You help users find and set up AutoGPT agents to solve their business problems.
1. **find_agent** → Search for agents that solve their problem
2. **get_agent_details** → Get comprehensive info about chosen agent
3. **get_required_setup_info** → Verify user has required credentials (MANDATORY before next step)
4. **setup_agent** or **run_agent** → Execute the agent
4. **schedule_agent** or **run_agent** → Execute the agent
## YOUR APPROACH
@@ -34,7 +39,11 @@ You help users find and set up AutoGPT agents to solve their business problems.
- Explain credentials are added via the frontend interface
### STEP 5: EXECUTE
- Once credentials verified, use `setup_agent` for scheduled runs OR `run_agent` for immediate execution
<<<<<<< Updated upstream
- Once credentials verified, use `schedule_agent` for scheduled and tirggered runs OR `run_agent` for immediate execution
=======
- Once credentials verified, use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution
>>>>>>> Stashed changes
- Confirm successful setup/run
- Provide clear next steps

View File

@@ -169,6 +169,8 @@ async def stream_chat(
async for chunk in chat_service.stream_chat_completion(
session_id, message, is_user_message=is_user_message, user_id=user_id
):
with open("chunks.log", "a") as f:
f.write(f"{session_id}: {chunk}\n")
yield chunk.to_sse()
return StreamingResponse(

View File

@@ -130,8 +130,11 @@ async def stream_chat_completion(
has_yielded_end = False
has_yielded_error = False
has_done_tool_call = False
has_received_text = False
text_streaming_ended = False
messages_to_add: list[ChatMessage] = []
should_retry = False
try:
async for chunk in _stream_chat_chunks(
session=session,
@@ -140,30 +143,17 @@ async def stream_chat_completion(
if isinstance(chunk, StreamTextChunk):
assistant_response.content += chunk.content
has_received_text = True
yield chunk
elif isinstance(chunk, StreamToolCallStart):
# Emit text_ended before first tool call
if not text_streaming_ended:
# Emit text_ended before first tool call, but only if we've received text
if has_received_text and not text_streaming_ended:
yield StreamTextEnded()
text_streaming_ended = True
yield chunk
# elif isinstance(chunk, StreamToolCall):
# messages_to_add.append(
# ChatMessage(
# role="assistant",
# content="",
# tool_calls=[
# {
# "id": chunk.tool_id,
# "type": "function",
# "function": {
# "name": chunk.tool_name,
# "arguments": chunk.arguments,
# },
# }
# ],
# )
# )
elif isinstance(chunk, StreamToolCall):
# Just pass on the tool call notification
pass
elif isinstance(chunk, StreamToolExecutionResult):
result_content = (
chunk.result
@@ -178,6 +168,11 @@ async def stream_chat_completion(
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
logger.warning(
f"Tool {chunk.tool_name} (ID: {chunk.tool_id}) execution failed"
)
yield chunk
elif isinstance(chunk, StreamEnd):
if not has_done_tool_call:
@@ -185,7 +180,6 @@ async def stream_chat_completion(
yield chunk
elif isinstance(chunk, StreamError):
has_yielded_error = True
yield chunk
elif isinstance(chunk, StreamUsage):
session.usage.append(
Usage(
@@ -198,23 +192,34 @@ async def stream_chat_completion(
logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True)
except Exception as e:
logger.error(f"Error during stream: {e!s}", exc_info=True)
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
# Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.)
is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError))
if retry_count < config.max_retries:
async for chunk in stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
retry_count=retry_count + 1,
):
yield chunk
if is_retryable and retry_count < config.max_retries:
logger.info(
f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}"
)
should_retry = True
else:
# Non-retryable error or max retries exceeded
# Save any partial progress before reporting error
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
if not has_yielded_error:
error_message = str(e)
if not is_retryable:
error_message = f"Non-retryable error: {error_message}"
elif retry_count >= config.max_retries:
error_message = (
f"Max retries ({config.max_retries}) exceeded: {error_message}"
)
error_response = StreamError(
message=str(e),
message=error_message,
timestamp=datetime.now(UTC).isoformat(),
)
yield error_response
@@ -222,28 +227,41 @@ async def stream_chat_completion(
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
)
return
finally:
# We always upsert the session even if an error occurs
# So we dont lose track of tool call executions
# Handle retry outside of exception handler to avoid nesting
if should_retry and retry_count < config.max_retries:
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}"
)
# Only append assistant response if it has content or tool calls
# to avoid saving empty messages on errors
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
# If we haven't done a tool call, stream the chat completion again to get the tool call
if has_done_tool_call:
logger.info(
"No tool call done, streaming chat completion again to get the tool call"
)
async for chunk in stream_chat_completion(
session_id=session.session_id, user_id=user_id
):
yield chunk
async for chunk in stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
retry_count=retry_count + 1,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
# Normal completion path - save session and handle tool call continuation
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
)
# Only append assistant response if it has content or tool calls
# to avoid saving empty messages on errors
if assistant_response.content or assistant_response.tool_calls:
messages_to_add.append(assistant_response)
session.messages.extend(messages_to_add)
await upsert_chat_session(session)
# If we did a tool call, stream the chat completion again to get the next response
if has_done_tool_call:
logger.info(
"Tool call executed, streaming chat completion again to get assistant response"
)
async for chunk in stream_chat_completion(
session_id=session.session_id, user_id=user_id
):
yield chunk
async def _stream_chat_chunks(
@@ -286,11 +304,12 @@ async def _stream_chat_chunks(
tool_calls: list[dict[str, Any]] = []
active_tool_call_idx: int | None = None
finish_reason: str | None = None
# Track which tool call indices have had their start event emitted
emitted_start_for_idx: set[int] = set()
# Process the stream
chunk: ChatCompletionChunk
async for chunk in stream:
logger.info(f"Chunk: \n\n{chunk}")
if chunk.usage:
yield StreamUsage(
prompt_tokens=chunk.usage.prompt_tokens,
@@ -320,27 +339,12 @@ async def _stream_chat_chunks(
if delta.tool_calls:
for tc_chunk in delta.tool_calls:
idx = tc_chunk.index
if active_tool_call_idx is None:
active_tool_call_idx = idx
yield StreamToolCallStart(
idx=idx,
timestamp=datetime.now(UTC).isoformat(),
)
# When we start receiving a new tool call (higher index),
# yield the previous one since it's now complete
# (OpenAI streams tool calls with incrementing indices)
if active_tool_call_idx != idx:
yield StreamToolCallStart(
idx=idx,
timestamp=datetime.now(UTC).isoformat(),
)
yield_idx = idx - 1
async for tc in _yield_tool_call(
tool_calls, yield_idx, session
):
yield tc
# Update to track the new active tool call
# Update active tool call index if needed
if (
active_tool_call_idx is None
or active_tool_call_idx != idx
):
active_tool_call_idx = idx
# Ensure we have a tool call object at this index
@@ -368,21 +372,37 @@ async def _stream_chat_chunks(
tool_calls[idx]["function"][
"arguments"
] += tc_chunk.function.arguments
# Emit StreamToolCallStart only after we have the tool call ID
if (
idx not in emitted_start_for_idx
and tool_calls[idx]["id"]
):
yield StreamToolCallStart(
tool_id=tool_calls[idx]["id"],
timestamp=datetime.now(UTC).isoformat(),
)
emitted_start_for_idx.add(idx)
logger.info(f"Stream complete. Finish reason: {finish_reason}")
# Yield the final tool call if any were accumulated
if active_tool_call_idx is not None and active_tool_call_idx < len(
tool_calls
):
async for tc in _yield_tool_call(
tool_calls, active_tool_call_idx, session
):
yield tc
elif active_tool_call_idx is not None:
logger.warning(
f"Active tool call index {active_tool_call_idx} out of bounds "
f"(tool_calls length: {len(tool_calls)})"
)
# Yield all accumulated tool calls after the stream is complete
# This ensures all tool call arguments have been fully received
for idx, tool_call in enumerate(tool_calls):
try:
async for tc in _yield_tool_call(tool_calls, idx, session):
yield tc
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call {idx}: {e}",
exc_info=True,
extra={"tool_call": tool_call},
)
yield StreamError(
message=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
timestamp=datetime.now(UTC).isoformat(),
)
# Re-raise to trigger retry logic in the parent function
raise
yield StreamEnd(
timestamp=datetime.now(UTC).isoformat(),
@@ -407,26 +427,17 @@ async def _yield_tool_call(
session: ChatSession,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Yield a tool call.
Yield a tool call and its execution result.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments with error handling
try:
arguments = orjson.loads(tool_calls[yield_idx]["function"]["arguments"])
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call arguments: {e}",
exc_info=True,
extra={
"tool_call": tool_calls[yield_idx],
},
)
yield StreamError(
message=f"Invalid tool call arguments: {e}",
timestamp=datetime.now(UTC).isoformat(),
)
return
# Parse tool call arguments - exceptions will propagate to caller
arguments = orjson.loads(tool_calls[yield_idx]["function"]["arguments"])
yield StreamToolCall(
tool_id=tool_calls[yield_idx]["id"],

View File

@@ -203,7 +203,7 @@ class GetAgentDetailsTool(BaseTool):
)
return AgentDetailsResponse(
message=f"Found agent '{agent_details.name}'",
message=f"Found agent '{agent_details.name}'. You do not need to run this tool again for this agent.",
session_id=session_id,
agent=agent_details,
user_authenticated=user_id is not None,

View File

@@ -158,8 +158,19 @@ class GetRequiredSetupInfoTool(BaseTool):
"inputs": inputs_list,
"execution_modes": execution_modes,
}
message = ""
if len(agent_details.agent.credentials) > 0:
message = "The user needs to enter credentials before proceeding. Please wait until you have a message informing you that the credentials have been entered."
elif len(inputs_list) > 0:
message = (
"The user needs to enter inputs before proceeding. Please wait until you have a message informing you that the inputs have been entered. The inputs are: "
+ ", ".join([input["name"] for input in inputs_list])
)
else:
message = "The agent is ready to run. Please call the run_agent tool with the agent ID."
return SetupRequirementsResponse(
message="Agent details retrieved successfully",
message=message,
session_id=session_id,
setup_info=SetupInfo(
agent_id=agent_details.agent.id,

View File

@@ -4,7 +4,9 @@ import logging
from typing import Any
from backend.data.graph import get_graph
from backend.data.model import CredentialsMetaInput
from backend.executor import utils as execution_utils
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.tools.base import BaseTool
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
@@ -142,15 +144,96 @@ class RunAgentTool(BaseTool):
else:
library_agent = existing_library_agent
# Build credentials mapping for the graph
graph_credentials_inputs: dict[str, CredentialsMetaInput] = {}
# Get aggregated credentials requirements from the graph
aggregated_creds = graph.aggregate_credentials_inputs()
logger.debug(
f"Matching credentials for graph {graph.id}: {len(aggregated_creds)} required"
)
if aggregated_creds:
# Get all available credentials for the user
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
# Track unmatched credentials for error reporting
missing_creds: list[str] = []
# For each required credential field, find a matching user credential
# field_info.provider is a frozenset because aggregate_credentials_inputs()
# combines requirements from multiple nodes. A credential matches if its
# provider is in the set of acceptable providers.
for credential_field_name, (
credential_requirements,
_node_fields,
) in aggregated_creds.items():
# Find first matching credential by provider and type
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in credential_requirements.provider
and cred.type in credential_requirements.supported_types
),
None,
)
if matching_cred:
# Use Pydantic validation to ensure type safety
try:
graph_credentials_inputs[credential_field_name] = (
CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{credential_field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
missing_creds.append(
f"{credential_field_name} (validation failed: {e})"
)
else:
missing_creds.append(
f"{credential_field_name} "
f"(requires provider in {list(credential_requirements.provider)}, "
f"type in {list(credential_requirements.supported_types)})"
)
# Fail fast if any required credentials are missing
if missing_creds:
logger.warning(
f"Cannot execute agent - missing credentials: {missing_creds}"
)
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_creds)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials."
f"Please set up the following credentials: {', '.join(missing_creds)}",
session_id=session_id,
details={"missing_credentials": missing_creds},
)
logger.info(
f"Credential matching complete: {len(graph_credentials_inputs)}/{len(aggregated_creds)} matched"
)
# At this point we know the user is ready to run the agent
# So we can execute the agent
execution = await execution_utils.add_graph_execution(
graph_id=library_agent.graph_id,
user_id=user_id,
inputs=inputs,
graph_credentials_inputs=graph_credentials_inputs,
)
return ExecutionStartedResponse(
message="Agent execution started",
message="Agent execution successfully started. Do not run this tool again unless specifically asked to run the agent again.",
session_id=session_id,
execution_id=execution.id,
graph_id=library_agent.graph_id,

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel
from backend.data.graph import get_graph
from backend.data.model import CredentialsMetaInput
from backend.data.user import get_user_by_id
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.chat.tools.get_required_setup_info import (
GetRequiredSetupInfoTool,
)
@@ -43,7 +44,7 @@ class SetupAgentTool(BaseTool):
@property
def name(self) -> str:
return "setup_agent"
return "schedule_agent"
@property
def description(self) -> str:
@@ -193,6 +194,48 @@ class SetupAgentTool(BaseTool):
user = await get_user_by_id(user_id)
user_timezone = get_user_timezone_or_utc(user.timezone if user else None)
# Map required credentials (schema field names) to actual user credential IDs
# credentials param contains CredentialsMetaInput with schema field names as keys
# We need to find the user's actual credentials that match the provider/type
creds_manager = IntegrationCredentialsManager()
user_credentials = await creds_manager.store.get_all_creds(user_id)
# Build a mapping from schema field name -> actual credential ID
resolved_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[str] = []
for field_name, cred_meta in credentials.items():
# Find a matching credential from the user's credentials
matching_cred = next(
(
c
for c in user_credentials
if c.provider == cred_meta.provider and c.type == cred_meta.type
),
None,
)
if matching_cred:
# Use the actual credential ID instead of the schema field name
# Create a new CredentialsMetaInput with the actual credential ID
# but keep the same provider/type from the original meta
resolved_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=cred_meta.provider,
type=cred_meta.type,
title=cred_meta.title,
)
else:
missing_credentials.append(
f"{cred_meta.title} ({cred_meta.provider}/{cred_meta.type})"
)
if missing_credentials:
return ErrorResponse(
message=f"Cannot execute agent: missing {len(missing_credentials)} required credential(s). You need to call the get_required_setup_info tool to setup the credentials.",
session_id=session_id,
)
result = await get_scheduler_client().add_execution_schedule(
user_id=user_id,
graph_id=library_agent.graph_id,
@@ -200,7 +243,7 @@ class SetupAgentTool(BaseTool):
name=name,
cron=cron,
input_data=inputs,
input_credentials=credentials,
input_credentials=resolved_credentials,
user_timezone=user_timezone,
)
@@ -210,7 +253,7 @@ class SetupAgentTool(BaseTool):
result.next_run_time, user_timezone
)
return ExecutionStartedResponse(
message="Agent execution started",
message="Agent execution successfully scheduled. Do not run this tool again unless specifically asked to run the agent again.",
session_id=session_id,
execution_id=result.id,
graph_id=library_agent.graph_id,