fix(backend): address review feedback - error handling and JSON parsing

- Add try-except around _mark_operation_completed to handle Redis failures gracefully
- Wrap upsert_chat_session in try-except to release Redis lock on failure
- Use JSON parsing instead of substring matching for detecting operation types
- Fix hardcoded "Agent creation failed" to use actual tool_name in error messages
- Fix import ordering (isort) and formatting (black)
This commit is contained in:
Zamil Majdy
2026-01-27 11:27:55 -06:00
parent 1dcdc726e5
commit 4371c48814
2 changed files with 74 additions and 49 deletions

View File

@@ -89,10 +89,17 @@ async def _mark_operation_started(tool_call_id: str) -> bool:
async def _mark_operation_completed(tool_call_id: str) -> None:
"""Mark a long-running operation as completed (remove Redis key)."""
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
await redis.delete(key)
"""Mark a long-running operation as completed (remove Redis key).
This is best-effort - if Redis fails, the TTL will eventually clean up.
"""
try:
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
await redis.delete(key)
except Exception as e:
# Non-critical: TTL will clean up eventually
logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}")
class LangfuseNotConfiguredError(Exception):
@@ -428,14 +435,22 @@ async def stream_chat_completion(
else orjson.dumps(chunk.output).decode("utf-8")
)
# Skip saving long-running operation responses - messages already saved in _yield_tool_call
is_long_running_response = any(
op_type in result_content
for op_type in ('"operation_started"', '"operation_in_progress"')
)
# Use JSON parsing instead of substring matching to avoid false positives
is_long_running_response = False
try:
parsed = orjson.loads(result_content)
if isinstance(parsed, dict) and parsed.get("type") in (
"operation_started",
"operation_in_progress",
):
is_long_running_response = True
except (orjson.JSONDecodeError, TypeError):
pass # Not JSON or not a dict - treat as regular response
if is_long_running_response:
# Remove from accumulated_tool_calls since assistant message was already saved
accumulated_tool_calls[:] = [
tc for tc in accumulated_tool_calls
tc
for tc in accumulated_tool_calls
if tc["id"] != chunk.toolCallId
]
else:
@@ -1390,41 +1405,50 @@ async def _yield_tool_call(
"check back in a few minutes."
)
# Save assistant message with tool_call FIRST (required by LLM)
assistant_message = ChatMessage(
role="assistant",
content="",
tool_calls=[tool_calls[yield_idx]],
)
session.messages.append(assistant_message)
# Then save pending tool result
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} for tool {tool_name} "
f"in session {session.session_id}"
)
asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
operation_id=operation_id,
session_id=session.session_id,
user_id=session.user_id,
# Wrap session save and task creation in try-except to release lock on failure
try:
# Save assistant message with tool_call FIRST (required by LLM)
assistant_message = ChatMessage(
role="assistant",
content="",
tool_calls=[tool_calls[yield_idx]],
)
)
session.messages.append(assistant_message)
# Then save pending tool result
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
logger.info(
f"Saved pending operation {operation_id} for tool {tool_name} "
f"in session {session.session_id}"
)
asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
operation_id=operation_id,
session_id=session.session_id,
user_id=session.user_id,
)
)
except Exception as e:
# Release the Redis lock since the background task won't be spawned
await _mark_operation_completed(tool_call_id)
logger.error(
f"Failed to setup long-running tool {tool_name}: {e}", exc_info=True
)
raise
# Return immediately - don't wait for completion
yield StreamToolOutputAvailable(
@@ -1536,14 +1560,13 @@ async def _execute_long_running_tool(
except Exception as e:
logger.error(f"Background tool {tool_name} failed: {e}", exc_info=True)
error_response = {
"type": "error",
"message": f"Agent creation failed: {str(e)}",
}
error_response = ErrorResponse(
message=f"Tool {tool_name} failed: {str(e)}",
)
await _update_pending_operation(
session_id=session_id,
tool_call_id=tool_call_id,
result=orjson.dumps(error_response).decode("utf-8"),
result=error_response.model_dump_json(),
success=False,
)
finally:

View File

@@ -120,7 +120,9 @@ export function useChatSession({
if (msg.role !== "tool" || !msg.content) return false;
try {
const content =
typeof msg.content === "string" ? JSON.parse(msg.content) : msg.content;
typeof msg.content === "string"
? JSON.parse(msg.content)
: msg.content;
return content?.type === "operation_pending";
} catch {
return false;