Compare commits

...

24 Commits

Author SHA1 Message Date
Saurya Velagapudi
e426ce2138 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-23 15:15:57 -08:00
Saurya Velagapudi
92f90b633c Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-21 09:28:42 -08:00
Saurya Velagapudi
ea1e9dab9c Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-15 17:12:02 -08:00
openhands
44be37b8b5 Fix lint errors in slack_view.py
- Fix import ordering (move openhands.sdk import to correct position)
- Remove blank line between imports in method
- Fix assert statement formatting

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-15 20:58:06 +00:00
openhands
0c4d73f33a Remove TooManyWaitingError and max concurrent limit from runtime wait tracker
Per user request, removing the concurrency limit until we have production data
about how many runtime waits are actually occurring. The Prometheus metrics
remain in place for monitoring.

Changes:
- Remove TooManyWaitingError exception class
- Remove MAX_CONCURRENT_RUNTIME_WAITS config and limit check
- Remove get_max_concurrent_waits() helper function
- Simplify track_runtime_wait() to only track metrics
- Update slack_view.py to remove TooManyWaitingError handling
- Remove test_too_many_waiting_error test case

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-15 20:55:10 +00:00
openhands
64492d35bf Fix ruff-format lint error in saas_nested_conversation_manager.py
Collapse multi-line redis.set() call to single line to satisfy ruff-format.

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-15 19:46:28 +00:00
Saurya Velagapudi
75928ebab0 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-15 11:43:10 -08:00
openhands
b7e5902008 Fix race condition in maybe_start_agent_loop using atomic SETNX
Use Redis SETNX (set if not exists) to atomically check and set the
starting flag. This prevents race conditions where multiple concurrent
callers could start duplicate agent loops.

Before: check-then-set was not atomic, allowing race conditions
After: atomic SETNX ensures only one caller starts the agent loop
2026-01-15 18:14:29 +00:00
Saurya Velagapudi
8fba1dd5c0 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-15 08:54:21 -08:00
Saurya Velagapudi
8a7a615f33 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-14 21:11:36 -08:00
Saurya Velagapudi
87a10b3c24 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-14 15:38:39 -08:00
Saurya Velagapudi
558a9649d4 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-14 13:22:28 -08:00
openhands
bed46a0d49 Fix lint and mypy errors
- Fix mypy 'unreachable' error in test by using try/except instead of pytest.raises
- Apply ruff formatting fixes (single-line raise, import sorting)

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 20:56:05 +00:00
openhands
c0c4efe77a Fix lint: sort imports in slack_view.py
Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 20:40:17 +00:00
openhands
fe0cb83cec Improve error message: hide internal details from users
- Change user-facing message to generic 'Something went wrong. Please try again later.'
- Change log level from warning to error for rejected waits
- Keep detailed info (wait count, max limit) in server logs only

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 20:24:31 +00:00
openhands
7a244f24ac Add concurrency limits and Prometheus metrics for runtime wait
- Add runtime_wait_tracker.py module with:
  - Configurable max concurrent waits (SLACK_MAX_CONCURRENT_RUNTIME_WAITS env var, default 100)
  - Prometheus metrics: slack_runtime_wait_current, slack_runtime_wait_total, slack_runtime_wait_completed
  - TooManyWaitingError exception when limit is exceeded
- Integrate tracker into send_message_to_v0_conversation
- Add tests for the tracker functionality

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 20:21:08 +00:00
openhands
c12790bd1a Add tests for _wait_for_runtime_ready method
Tests cover:
- Immediate success when runtime is already ready
- Polling until runtime becomes ready (key fix for #12325)
- Polling when conversation status is STARTING
- Timeout behavior when runtime takes too long
- Handling empty observations gracefully

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 20:09:05 +00:00
Saurya Velagapudi
d098cfa068 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-14 11:53:46 -08:00
openhands
f4299afb3e Fix: Prefix unused variable with underscore to satisfy ruff F841
Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-14 17:33:53 +00:00
Saurya Velagapudi
36f6e231ec Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-14 09:30:36 -08:00
Saurya Velagapudi
5e2ca065e6 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-09 10:55:43 -08:00
Saurya Velagapudi
44dde42457 Update slack_view.py
Change the type of providers_set to match the provided
2026-01-08 18:33:18 -08:00
Saurya Velagapudi
9c6fe05586 Merge branch 'main' into fix/slack-runtime-unavailable-error 2026-01-08 17:42:05 -08:00
openhands
abdd41054b Fix: Wait for runtime to be ready in Slack follow-up messages
When a user sends a follow-up message to an existing Slack conversation
and the runtime is down, instead of returning an error, we now wait for
the runtime to come up before sending the message.

Since Slack is asynchronous, users don't need to know about internal
runtime state - they just see a slight delay before getting a response.

Changes:
- Add _wait_for_runtime_ready() method that polls until runtime is ready
- Remove immediate error when agent_state is LOADING
- Wait up to 120 seconds for runtime to start (with 2s poll interval)
- Only show error if runtime takes too long to start

Fixes #12325

Co-authored-by: openhands <openhands@all-hands.dev>
2026-01-09 00:04:47 +00:00
4 changed files with 419 additions and 19 deletions

View File

@@ -0,0 +1,67 @@
"""Tracks concurrent runtime wait operations for Slack follow-up messages.
This module provides Prometheus metrics for monitoring wait operations in production.
"""
from contextlib import asynccontextmanager
from prometheus_client import Counter, Gauge
from openhands.core.logger import openhands_logger as logger
# Prometheus metrics
RUNTIME_WAIT_CURRENT = Gauge(
'slack_runtime_wait_current',
'Current number of Slack follow-up messages waiting for runtime to be ready',
)
RUNTIME_WAIT_TOTAL = Counter(
'slack_runtime_wait_total',
'Total number of Slack follow-up messages that started waiting for runtime',
)
RUNTIME_WAIT_COMPLETED = Counter(
'slack_runtime_wait_completed',
'Total number of Slack follow-up messages that completed waiting for runtime',
['status'], # 'success', 'timeout', 'error'
)
# Internal counter for tracking concurrent waits
_current_wait_count = 0
@asynccontextmanager
async def track_runtime_wait():
"""Context manager to track runtime wait operations.
Usage:
async with track_runtime_wait():
await _wait_for_runtime_ready(...)
"""
global _current_wait_count
_current_wait_count += 1
RUNTIME_WAIT_CURRENT.set(_current_wait_count)
RUNTIME_WAIT_TOTAL.inc()
logger.info(f'Slack runtime wait started: {_current_wait_count} now waiting')
try:
yield
RUNTIME_WAIT_COMPLETED.labels(status='success').inc()
except Exception as e:
# Determine if this was a timeout or other error
if 'taking too long' in str(e):
RUNTIME_WAIT_COMPLETED.labels(status='timeout').inc()
else:
RUNTIME_WAIT_COMPLETED.labels(status='error').inc()
raise
finally:
_current_wait_count -= 1
RUNTIME_WAIT_CURRENT.set(_current_wait_count)
logger.info(f'Slack runtime wait ended: {_current_wait_count} now waiting')
def get_current_wait_count() -> int:
"""Get the current number of waiting coroutines (for testing/debugging)."""
return _current_wait_count

View File

@@ -3,6 +3,7 @@ from uuid import UUID, uuid4
from integrations.models import Message
from integrations.resolver_context import ResolverUserContext
from integrations.slack.runtime_wait_tracker import track_runtime_wait
from integrations.slack.slack_types import SlackViewInterface, StartingConvoException
from integrations.slack.slack_v1_callback_processor import SlackV1CallbackProcessor
from integrations.utils import (
@@ -385,21 +386,18 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
)
# Either join ongoing conversation, or restart the conversation
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
# If runtime is stopped, this will start it in the background
_agent_loop_info = await conversation_manager.maybe_start_agent_loop(
self.conversation_id, conversation_init_data, user_id
)
final_agent_observation = get_final_agent_observation(
agent_loop_info.event_store
)
agent_state = (
None
if len(final_agent_observation) == 0
else final_agent_observation[0].agent_state
)
if not agent_state or agent_state == AgentState.LOADING:
raise StartingConvoException('Conversation is still starting')
# Wait for the runtime to be ready before sending the message
# Slack is asynchronous, so we can afford to wait
# Track this wait operation for metrics
async with track_runtime_wait():
await self._wait_for_runtime_ready(
user_id, conversation_init_data, providers_set
)
instructions, _ = self._get_instructions(jinja)
user_msg = MessageAction(content=instructions)
@@ -520,6 +518,54 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
return self.conversation_id
async def _wait_for_runtime_ready(
self,
user_id: str,
conversation_init_data,
providers_set: list[ProviderType],
max_wait_seconds: int = 120,
poll_interval_seconds: float = 2.0,
):
"""Wait for the runtime to be ready before sending a message.
Since Slack is asynchronous, we can wait for the runtime to come up
rather than returning an error to the user.
"""
import asyncio
from openhands.storage.data_models.conversation_status import ConversationStatus
start_time = asyncio.get_event_loop().time()
while True:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= max_wait_seconds:
raise StartingConvoException(
'The conversation is taking too long to start. Please try again later.'
)
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
self.conversation_id, conversation_init_data, user_id
)
# Check if runtime is running
if agent_loop_info.status == ConversationStatus.RUNNING:
# Also verify agent state is ready
final_agent_observation = get_final_agent_observation(
agent_loop_info.event_store
)
agent_state = (
None
if len(final_agent_observation) == 0
else final_agent_observation[0].agent_state
)
if agent_state and agent_state != AgentState.LOADING:
return # Runtime is ready
# Wait before polling again
await asyncio.sleep(poll_interval_seconds)
def get_response_msg(self):
user_info: SlackUser = self.slack_to_openhands_user
conversation_link = CONVERSATION_URL.format(self.conversation_id)

View File

@@ -217,15 +217,19 @@ class SaasNestedConversationManager(ConversationManager):
status = ConversationStatus.STARTING
if status is ConversationStatus.STOPPED:
# Mark the agentloop as starting in redis
await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS)
# Atomically set the key only if it doesn't exist (SETNX)
# This prevents race conditions where multiple callers try to start the agent loop
was_set = await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS, nx=True)
# Start the agent loop in the background
asyncio.create_task(
self._start_agent_loop(
sid, settings, user_id, initial_user_msg, replay_json
# Only start the agent loop if we successfully set the key
if was_set:
asyncio.create_task(
self._start_agent_loop(
sid, settings, user_id, initial_user_msg, replay_json
)
)
)
# If was_set is False, another caller already set the key and is starting the loop
status = ConversationStatus.STARTING
return AgentLoopInfo(
conversation_id=sid,

View File

@@ -339,3 +339,286 @@ class TestPausedSandboxResumption:
mock_sandbox_service.resume_sandbox.assert_called_once_with('sandbox-123')
mock_httpx_client.post.assert_called_once()
mock_response.raise_for_status.assert_called_once()
# ---------------------------------------------------------------------------
# Test 4: Runtime Ready Wait Logic for V0 Conversations
# ---------------------------------------------------------------------------
class TestRuntimeReadyWait:
"""Test the _wait_for_runtime_ready method behavior for V0 conversations.
This tests the fix for issue #12325 where follow-up messages would fail
when the runtime was not yet ready (LOADING state).
"""
@patch('integrations.slack.slack_view.conversation_manager')
@patch('integrations.slack.slack_view.get_final_agent_observation')
async def test_wait_for_runtime_ready_immediate_success(
self,
mock_get_final_agent_observation,
mock_conversation_manager,
slack_update_conversation_view_v0,
):
"""Test that _wait_for_runtime_ready returns immediately when runtime is ready."""
from openhands.core.schema.agent import AgentState
from openhands.storage.data_models.conversation_status import ConversationStatus
# Setup: Runtime is already running with RUNNING agent state
mock_agent_loop_info = MagicMock()
mock_agent_loop_info.status = ConversationStatus.RUNNING
mock_agent_loop_info.event_store = MagicMock()
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
return_value=mock_agent_loop_info
)
mock_observation = MagicMock()
mock_observation.agent_state = AgentState.RUNNING
mock_get_final_agent_observation.return_value = [mock_observation]
# Execute
await slack_update_conversation_view_v0._wait_for_runtime_ready(
user_id='test-user-123',
conversation_init_data=MagicMock(),
providers_set=[],
)
# Verify: Should only call maybe_start_agent_loop once (no polling needed)
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 1
@patch('integrations.slack.slack_view.conversation_manager')
@patch('integrations.slack.slack_view.get_final_agent_observation')
async def test_wait_for_runtime_ready_polls_until_ready(
self,
mock_get_final_agent_observation,
mock_conversation_manager,
slack_update_conversation_view_v0,
):
"""Test that _wait_for_runtime_ready polls until runtime becomes ready.
This is the key fix: instead of raising an error when agent_state is LOADING,
we now wait for it to become ready.
"""
from openhands.core.schema.agent import AgentState
from openhands.storage.data_models.conversation_status import ConversationStatus
# Setup: First call returns LOADING, second call returns RUNNING
mock_agent_loop_info_loading = MagicMock()
mock_agent_loop_info_loading.status = ConversationStatus.RUNNING
mock_agent_loop_info_loading.event_store = MagicMock()
mock_agent_loop_info_running = MagicMock()
mock_agent_loop_info_running.status = ConversationStatus.RUNNING
mock_agent_loop_info_running.event_store = MagicMock()
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
side_effect=[mock_agent_loop_info_loading, mock_agent_loop_info_running]
)
# First call: agent is LOADING, second call: agent is RUNNING
mock_loading_observation = MagicMock()
mock_loading_observation.agent_state = AgentState.LOADING
mock_running_observation = MagicMock()
mock_running_observation.agent_state = AgentState.RUNNING
mock_get_final_agent_observation.side_effect = [
[mock_loading_observation],
[mock_running_observation],
]
# Execute with short poll interval for faster test
await slack_update_conversation_view_v0._wait_for_runtime_ready(
user_id='test-user-123',
conversation_init_data=MagicMock(),
providers_set=[],
poll_interval_seconds=0.01, # Very short for testing
)
# Verify: Should have polled twice
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
@patch('integrations.slack.slack_view.conversation_manager')
@patch('integrations.slack.slack_view.get_final_agent_observation')
async def test_wait_for_runtime_ready_polls_when_starting(
self,
mock_get_final_agent_observation,
mock_conversation_manager,
slack_update_conversation_view_v0,
):
"""Test that _wait_for_runtime_ready polls when conversation status is STARTING."""
from openhands.core.schema.agent import AgentState
from openhands.storage.data_models.conversation_status import ConversationStatus
# Setup: First call returns STARTING status, second call returns RUNNING
mock_agent_loop_info_starting = MagicMock()
mock_agent_loop_info_starting.status = ConversationStatus.STARTING
mock_agent_loop_info_starting.event_store = MagicMock()
mock_agent_loop_info_running = MagicMock()
mock_agent_loop_info_running.status = ConversationStatus.RUNNING
mock_agent_loop_info_running.event_store = MagicMock()
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
side_effect=[mock_agent_loop_info_starting, mock_agent_loop_info_running]
)
mock_running_observation = MagicMock()
mock_running_observation.agent_state = AgentState.RUNNING
mock_get_final_agent_observation.return_value = [mock_running_observation]
# Execute
await slack_update_conversation_view_v0._wait_for_runtime_ready(
user_id='test-user-123',
conversation_init_data=MagicMock(),
providers_set=[],
poll_interval_seconds=0.01,
)
# Verify: Should have polled twice
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
@patch('integrations.slack.slack_view.conversation_manager')
@patch('integrations.slack.slack_view.get_final_agent_observation')
async def test_wait_for_runtime_ready_timeout(
self,
mock_get_final_agent_observation,
mock_conversation_manager,
slack_update_conversation_view_v0,
):
"""Test that _wait_for_runtime_ready raises exception after timeout."""
from integrations.slack.slack_types import StartingConvoException
from openhands.core.schema.agent import AgentState
from openhands.storage.data_models.conversation_status import ConversationStatus
# Setup: Always return LOADING state
mock_agent_loop_info = MagicMock()
mock_agent_loop_info.status = ConversationStatus.RUNNING
mock_agent_loop_info.event_store = MagicMock()
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
return_value=mock_agent_loop_info
)
mock_loading_observation = MagicMock()
mock_loading_observation.agent_state = AgentState.LOADING
mock_get_final_agent_observation.return_value = [mock_loading_observation]
# Execute with very short timeout
with pytest.raises(StartingConvoException) as exc_info:
await slack_update_conversation_view_v0._wait_for_runtime_ready(
user_id='test-user-123',
conversation_init_data=MagicMock(),
providers_set=[],
max_wait_seconds=0.05, # Very short timeout for testing
poll_interval_seconds=0.01,
)
# Verify error message
assert 'taking too long to start' in str(exc_info.value)
@patch('integrations.slack.slack_view.conversation_manager')
@patch('integrations.slack.slack_view.get_final_agent_observation')
async def test_wait_for_runtime_ready_handles_empty_observations(
self,
mock_get_final_agent_observation,
mock_conversation_manager,
slack_update_conversation_view_v0,
):
"""Test that _wait_for_runtime_ready handles empty observations gracefully."""
from openhands.core.schema.agent import AgentState
from openhands.storage.data_models.conversation_status import ConversationStatus
# Setup: First call returns empty observations, second call returns valid observation
mock_agent_loop_info = MagicMock()
mock_agent_loop_info.status = ConversationStatus.RUNNING
mock_agent_loop_info.event_store = MagicMock()
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
return_value=mock_agent_loop_info
)
mock_running_observation = MagicMock()
mock_running_observation.agent_state = AgentState.RUNNING
# First call: empty observations, second call: valid observation
mock_get_final_agent_observation.side_effect = [[], [mock_running_observation]]
# Execute
await slack_update_conversation_view_v0._wait_for_runtime_ready(
user_id='test-user-123',
conversation_init_data=MagicMock(),
providers_set=[],
poll_interval_seconds=0.01,
)
# Verify: Should have polled twice (empty observations means not ready)
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
# ---------------------------------------------------------------------------
# Test 5: Runtime Wait Tracker (Concurrency Limits and Metrics)
# ---------------------------------------------------------------------------
class TestRuntimeWaitTracker:
"""Test the runtime wait tracker for metrics."""
async def test_track_runtime_wait_increments_and_decrements(self):
"""Test that the tracker correctly increments and decrements the counter."""
from integrations.slack.runtime_wait_tracker import (
get_current_wait_count,
track_runtime_wait,
)
initial_count = get_current_wait_count()
async with track_runtime_wait():
assert get_current_wait_count() == initial_count + 1
assert get_current_wait_count() == initial_count
async def test_track_runtime_wait_decrements_on_exception(self):
"""Test that the tracker decrements even when an exception is raised."""
from integrations.slack.runtime_wait_tracker import (
get_current_wait_count,
track_runtime_wait,
)
initial_count = get_current_wait_count()
exception_raised = False
try:
async with track_runtime_wait():
assert get_current_wait_count() == initial_count + 1
raise ValueError('Test exception')
except ValueError:
exception_raised = True
assert exception_raised
assert get_current_wait_count() == initial_count
async def test_concurrent_waits_tracked_correctly(self):
"""Test that multiple concurrent waits are tracked correctly."""
import asyncio
from integrations.slack.runtime_wait_tracker import (
get_current_wait_count,
track_runtime_wait,
)
initial_count = get_current_wait_count()
results = []
async def wait_and_record():
async with track_runtime_wait():
results.append(get_current_wait_count())
await asyncio.sleep(0.01)
# Run 3 concurrent waits
await asyncio.gather(wait_and_record(), wait_and_record(), wait_and_record())
# All three should have seen counts >= initial + 1
# (exact values depend on timing, but all should be > initial)
assert all(r > initial_count for r in results)
assert get_current_wait_count() == initial_count