From 4371c48814eeb61ebfeeac5d7d3618b317104399 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Tue, 27 Jan 2026 11:27:55 -0600 Subject: [PATCH] fix(backend): address review feedback - error handling and JSON parsing - Add try-except around _mark_operation_completed to handle Redis failures gracefully - Wrap upsert_chat_session in try-except to release Redis lock on failure - Use JSON parsing instead of substring matching for detecting operation types - Fix hardcoded "Agent creation failed" to use actual tool_name in error messages - Fix import ordering (isort) and formatting (black) --- .../backend/api/features/chat/service.py | 119 +++++++++++------- .../contextual/Chat/useChatSession.ts | 4 +- 2 files changed, 74 insertions(+), 49 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 2a3e2f9c16..8b0b6f0e96 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -89,10 +89,17 @@ async def _mark_operation_started(tool_call_id: str) -> bool: async def _mark_operation_completed(tool_call_id: str) -> None: - """Mark a long-running operation as completed (remove Redis key).""" - redis = await get_redis_async() - key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}" - await redis.delete(key) + """Mark a long-running operation as completed (remove Redis key). + + This is best-effort - if Redis fails, the TTL will eventually clean up. + """ + try: + redis = await get_redis_async() + key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}" + await redis.delete(key) + except Exception as e: + # Non-critical: TTL will clean up eventually + logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}") class LangfuseNotConfiguredError(Exception): @@ -428,14 +435,22 @@ async def stream_chat_completion( else orjson.dumps(chunk.output).decode("utf-8") ) # Skip saving long-running operation responses - messages already saved in _yield_tool_call - is_long_running_response = any( - op_type in result_content - for op_type in ('"operation_started"', '"operation_in_progress"') - ) + # Use JSON parsing instead of substring matching to avoid false positives + is_long_running_response = False + try: + parsed = orjson.loads(result_content) + if isinstance(parsed, dict) and parsed.get("type") in ( + "operation_started", + "operation_in_progress", + ): + is_long_running_response = True + except (orjson.JSONDecodeError, TypeError): + pass # Not JSON or not a dict - treat as regular response if is_long_running_response: # Remove from accumulated_tool_calls since assistant message was already saved accumulated_tool_calls[:] = [ - tc for tc in accumulated_tool_calls + tc + for tc in accumulated_tool_calls if tc["id"] != chunk.toolCallId ] else: @@ -1390,41 +1405,50 @@ async def _yield_tool_call( "check back in a few minutes." ) - # Save assistant message with tool_call FIRST (required by LLM) - assistant_message = ChatMessage( - role="assistant", - content="", - tool_calls=[tool_calls[yield_idx]], - ) - session.messages.append(assistant_message) - - # Then save pending tool result - pending_message = ChatMessage( - role="tool", - content=OperationPendingResponse( - message=pending_msg, - operation_id=operation_id, - tool_name=tool_name, - ).model_dump_json(), - tool_call_id=tool_call_id, - ) - session.messages.append(pending_message) - await upsert_chat_session(session) - logger.info( - f"Saved pending operation {operation_id} for tool {tool_name} " - f"in session {session.session_id}" - ) - - asyncio.create_task( - _execute_long_running_tool( - tool_name=tool_name, - parameters=arguments, - tool_call_id=tool_call_id, - operation_id=operation_id, - session_id=session.session_id, - user_id=session.user_id, + # Wrap session save and task creation in try-except to release lock on failure + try: + # Save assistant message with tool_call FIRST (required by LLM) + assistant_message = ChatMessage( + role="assistant", + content="", + tool_calls=[tool_calls[yield_idx]], ) - ) + session.messages.append(assistant_message) + + # Then save pending tool result + pending_message = ChatMessage( + role="tool", + content=OperationPendingResponse( + message=pending_msg, + operation_id=operation_id, + tool_name=tool_name, + ).model_dump_json(), + tool_call_id=tool_call_id, + ) + session.messages.append(pending_message) + await upsert_chat_session(session) + logger.info( + f"Saved pending operation {operation_id} for tool {tool_name} " + f"in session {session.session_id}" + ) + + asyncio.create_task( + _execute_long_running_tool( + tool_name=tool_name, + parameters=arguments, + tool_call_id=tool_call_id, + operation_id=operation_id, + session_id=session.session_id, + user_id=session.user_id, + ) + ) + except Exception as e: + # Release the Redis lock since the background task won't be spawned + await _mark_operation_completed(tool_call_id) + logger.error( + f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True + ) + raise # Return immediately - don't wait for completion yield StreamToolOutputAvailable( @@ -1536,14 +1560,13 @@ async def _execute_long_running_tool( except Exception as e: logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True) - error_response = { - "type": "error", - "message": f"Agent creation failed: {str(e)}", - } + error_response = ErrorResponse( + message=f"Tool {tool_name} failed: {str(e)}", + ) await _update_pending_operation( session_id=session_id, tool_call_id=tool_call_id, - result=orjson.dumps(error_response).decode("utf-8"), + result=error_response.model_dump_json(), success=False, ) finally: diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts index 3f232d9f8d..a7b8842b89 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts @@ -120,7 +120,9 @@ export function useChatSession({ if (msg.role !== "tool" || !msg.content) return false; try { const content = - typeof msg.content === "string" ? JSON.parse(msg.content) : msg.content; + typeof msg.content === "string" + ? JSON.parse(msg.content) + : msg.content; return content?.type === "operation_pending"; } catch { return false;