fix(chat): use Redis-based tracking for long-running operations (multi-pod support)

Replace in-memory _running_operations dict with Redis-based tracking
to support Kubernetes deployments with multiple pods. The in-memory
dict only tracked operations per-process, allowing duplicate operations
to start if requests hit different pods.

Changes:
- Add _mark_operation_started() using Redis SETNX for atomic idempotency
- Add _mark_operation_completed() to clear Redis key on completion
- Use Redis key "chat:running_operation:{tool_call_id}" with 10min TTL
- Update _execute_long_running_tool() to clear key in finally block
This commit is contained in:
Zamil Majdy
2026-01-27 09:55:26 -06:00
parent a529a16d70
commit abe3707f17

View File

@@ -17,6 +17,7 @@ from openai import (
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
from backend.data.redis_client import get_redis_async
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
@@ -68,9 +69,30 @@ client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
langfuse = get_client()
# In-memory tracking of running long-running operations (tool_call_id -> asyncio.Task)
# Used for idempotency - prevents duplicate executions on browser refresh
_running_operations: dict[str, asyncio.Task] = {}
# Redis key prefix for tracking running long-running operations
# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh
RUNNING_OPERATION_PREFIX = "chat:running_operation:"
RUNNING_OPERATION_TTL = 600 # 10 minutes - safety net if pod dies during execution
async def _mark_operation_started(tool_call_id: str) -> bool:
"""Mark a long-running operation as started (Redis-based).
Returns True if successfully marked (operation was not already running),
False if operation was already running (lost race condition).
"""
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
# SETNX with TTL - atomic "set if not exists"
result = await redis.set(key, "1", ex=RUNNING_OPERATION_TTL, nx=True)
return result is not None
async def _mark_operation_completed(tool_call_id: str) -> None:
"""Mark a long-running operation as completed (remove Redis key)."""
redis = await get_redis_async()
key = f"{RUNNING_OPERATION_PREFIX}{tool_call_id}"
await redis.delete(key)
class LangfuseNotConfiguredError(Exception):
@@ -1302,32 +1324,65 @@ async def _yield_tool_call(
# Check if this tool is long-running (survives SSE disconnection)
tool = get_tool(tool_name)
if tool and tool.is_long_running:
# Idempotency check - if this tool_call_id is already running, return status
if tool_call_id in _running_operations:
existing_task = _running_operations[tool_call_id]
if not existing_task.done():
logger.info(
f"Tool call {tool_call_id} already in progress, returning status"
)
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationInProgressResponse(
message="Agent creation already in progress. Please wait...",
tool_call_id=tool_call_id,
).model_dump_json(),
success=True,
)
return
# Atomic check-and-set: returns False if operation already running (lost race)
if not await _mark_operation_started(tool_call_id):
logger.info(
f"Tool call {tool_call_id} already in progress, returning status"
)
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationInProgressResponse(
message="Agent creation already in progress. Please wait...",
tool_call_id=tool_call_id,
).model_dump_json(),
success=True,
)
return
# Generate operation ID
operation_id = str(uuid_module.uuid4())
# Build a user-friendly message based on tool and arguments
if tool_name == "create_agent":
agent_desc = arguments.get("description", "")
# Truncate long descriptions for the message
desc_preview = (
(agent_desc[:100] + "...") if len(agent_desc) > 100 else agent_desc
)
pending_msg = (
f"Creating your agent: {desc_preview}"
if desc_preview
else "Creating agent... This may take a few minutes."
)
started_msg = (
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
)
elif tool_name == "edit_agent":
changes = arguments.get("changes", "")
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
pending_msg = (
f"Editing agent: {changes_preview}"
if changes_preview
else "Editing agent... This may take a few minutes."
)
started_msg = (
"Agent edit started. You can close this tab - "
"check your library in a few minutes."
)
else:
pending_msg = f"Running {tool_name}... This may take a few minutes."
started_msg = (
f"{tool_name} started. You can close this tab - "
"check back in a few minutes."
)
# Save "pending" tool response to chat history immediately
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message="Creating agent... This may take a few minutes.",
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
@@ -1340,8 +1395,7 @@ async def _yield_tool_call(
f"in session {session.session_id}"
)
# Start background task (NOT tied to SSE)
task = asyncio.create_task(
asyncio.create_task(
_execute_long_running_tool(
tool_name=tool_name,
parameters=arguments,
@@ -1351,19 +1405,13 @@ async def _yield_tool_call(
user_id=session.user_id,
)
)
# Track for idempotency and cleanup
_running_operations[tool_call_id] = task
task.add_done_callback(lambda _: _running_operations.pop(tool_call_id, None))
# Return immediately - don't wait for completion
yield StreamToolOutputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
output=OperationStartedResponse(
message=(
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
),
message=started_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
@@ -1478,6 +1526,8 @@ async def _execute_long_running_tool(
result=orjson.dumps(error_response).decode("utf-8"),
success=False,
)
finally:
await _mark_operation_completed(tool_call_id)
async def _update_pending_operation(