mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
24 Commits
ci/pr-amd6
...
fix/slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e426ce2138 | ||
|
|
92f90b633c | ||
|
|
ea1e9dab9c | ||
|
|
44be37b8b5 | ||
|
|
0c4d73f33a | ||
|
|
64492d35bf | ||
|
|
75928ebab0 | ||
|
|
b7e5902008 | ||
|
|
8fba1dd5c0 | ||
|
|
8a7a615f33 | ||
|
|
87a10b3c24 | ||
|
|
558a9649d4 | ||
|
|
bed46a0d49 | ||
|
|
c0c4efe77a | ||
|
|
fe0cb83cec | ||
|
|
7a244f24ac | ||
|
|
c12790bd1a | ||
|
|
d098cfa068 | ||
|
|
f4299afb3e | ||
|
|
36f6e231ec | ||
|
|
5e2ca065e6 | ||
|
|
44dde42457 | ||
|
|
9c6fe05586 | ||
|
|
abdd41054b |
67
enterprise/integrations/slack/runtime_wait_tracker.py
Normal file
67
enterprise/integrations/slack/runtime_wait_tracker.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""Tracks concurrent runtime wait operations for Slack follow-up messages.
|
||||
|
||||
This module provides Prometheus metrics for monitoring wait operations in production.
|
||||
"""
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from prometheus_client import Counter, Gauge
|
||||
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
|
||||
# Prometheus metrics
|
||||
RUNTIME_WAIT_CURRENT = Gauge(
|
||||
'slack_runtime_wait_current',
|
||||
'Current number of Slack follow-up messages waiting for runtime to be ready',
|
||||
)
|
||||
|
||||
RUNTIME_WAIT_TOTAL = Counter(
|
||||
'slack_runtime_wait_total',
|
||||
'Total number of Slack follow-up messages that started waiting for runtime',
|
||||
)
|
||||
|
||||
RUNTIME_WAIT_COMPLETED = Counter(
|
||||
'slack_runtime_wait_completed',
|
||||
'Total number of Slack follow-up messages that completed waiting for runtime',
|
||||
['status'], # 'success', 'timeout', 'error'
|
||||
)
|
||||
|
||||
# Internal counter for tracking concurrent waits
|
||||
_current_wait_count = 0
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def track_runtime_wait():
|
||||
"""Context manager to track runtime wait operations.
|
||||
|
||||
Usage:
|
||||
async with track_runtime_wait():
|
||||
await _wait_for_runtime_ready(...)
|
||||
"""
|
||||
global _current_wait_count
|
||||
|
||||
_current_wait_count += 1
|
||||
RUNTIME_WAIT_CURRENT.set(_current_wait_count)
|
||||
RUNTIME_WAIT_TOTAL.inc()
|
||||
|
||||
logger.info(f'Slack runtime wait started: {_current_wait_count} now waiting')
|
||||
|
||||
try:
|
||||
yield
|
||||
RUNTIME_WAIT_COMPLETED.labels(status='success').inc()
|
||||
except Exception as e:
|
||||
# Determine if this was a timeout or other error
|
||||
if 'taking too long' in str(e):
|
||||
RUNTIME_WAIT_COMPLETED.labels(status='timeout').inc()
|
||||
else:
|
||||
RUNTIME_WAIT_COMPLETED.labels(status='error').inc()
|
||||
raise
|
||||
finally:
|
||||
_current_wait_count -= 1
|
||||
RUNTIME_WAIT_CURRENT.set(_current_wait_count)
|
||||
logger.info(f'Slack runtime wait ended: {_current_wait_count} now waiting')
|
||||
|
||||
|
||||
def get_current_wait_count() -> int:
|
||||
"""Get the current number of waiting coroutines (for testing/debugging)."""
|
||||
return _current_wait_count
|
||||
@@ -3,6 +3,7 @@ from uuid import UUID, uuid4
|
||||
|
||||
from integrations.models import Message
|
||||
from integrations.resolver_context import ResolverUserContext
|
||||
from integrations.slack.runtime_wait_tracker import track_runtime_wait
|
||||
from integrations.slack.slack_types import SlackViewInterface, StartingConvoException
|
||||
from integrations.slack.slack_v1_callback_processor import SlackV1CallbackProcessor
|
||||
from integrations.utils import (
|
||||
@@ -385,21 +386,18 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
|
||||
)
|
||||
|
||||
# Either join ongoing conversation, or restart the conversation
|
||||
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
|
||||
# If runtime is stopped, this will start it in the background
|
||||
_agent_loop_info = await conversation_manager.maybe_start_agent_loop(
|
||||
self.conversation_id, conversation_init_data, user_id
|
||||
)
|
||||
|
||||
final_agent_observation = get_final_agent_observation(
|
||||
agent_loop_info.event_store
|
||||
)
|
||||
agent_state = (
|
||||
None
|
||||
if len(final_agent_observation) == 0
|
||||
else final_agent_observation[0].agent_state
|
||||
)
|
||||
|
||||
if not agent_state or agent_state == AgentState.LOADING:
|
||||
raise StartingConvoException('Conversation is still starting')
|
||||
# Wait for the runtime to be ready before sending the message
|
||||
# Slack is asynchronous, so we can afford to wait
|
||||
# Track this wait operation for metrics
|
||||
async with track_runtime_wait():
|
||||
await self._wait_for_runtime_ready(
|
||||
user_id, conversation_init_data, providers_set
|
||||
)
|
||||
|
||||
instructions, _ = self._get_instructions(jinja)
|
||||
user_msg = MessageAction(content=instructions)
|
||||
@@ -520,6 +518,54 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
|
||||
|
||||
return self.conversation_id
|
||||
|
||||
async def _wait_for_runtime_ready(
|
||||
self,
|
||||
user_id: str,
|
||||
conversation_init_data,
|
||||
providers_set: list[ProviderType],
|
||||
max_wait_seconds: int = 120,
|
||||
poll_interval_seconds: float = 2.0,
|
||||
):
|
||||
"""Wait for the runtime to be ready before sending a message.
|
||||
|
||||
Since Slack is asynchronous, we can wait for the runtime to come up
|
||||
rather than returning an error to the user.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= max_wait_seconds:
|
||||
raise StartingConvoException(
|
||||
'The conversation is taking too long to start. Please try again later.'
|
||||
)
|
||||
|
||||
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
|
||||
self.conversation_id, conversation_init_data, user_id
|
||||
)
|
||||
|
||||
# Check if runtime is running
|
||||
if agent_loop_info.status == ConversationStatus.RUNNING:
|
||||
# Also verify agent state is ready
|
||||
final_agent_observation = get_final_agent_observation(
|
||||
agent_loop_info.event_store
|
||||
)
|
||||
agent_state = (
|
||||
None
|
||||
if len(final_agent_observation) == 0
|
||||
else final_agent_observation[0].agent_state
|
||||
)
|
||||
|
||||
if agent_state and agent_state != AgentState.LOADING:
|
||||
return # Runtime is ready
|
||||
|
||||
# Wait before polling again
|
||||
await asyncio.sleep(poll_interval_seconds)
|
||||
|
||||
def get_response_msg(self):
|
||||
user_info: SlackUser = self.slack_to_openhands_user
|
||||
conversation_link = CONVERSATION_URL.format(self.conversation_id)
|
||||
|
||||
@@ -217,15 +217,19 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
status = ConversationStatus.STARTING
|
||||
|
||||
if status is ConversationStatus.STOPPED:
|
||||
# Mark the agentloop as starting in redis
|
||||
await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS)
|
||||
# Atomically set the key only if it doesn't exist (SETNX)
|
||||
# This prevents race conditions where multiple callers try to start the agent loop
|
||||
was_set = await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS, nx=True)
|
||||
|
||||
# Start the agent loop in the background
|
||||
asyncio.create_task(
|
||||
self._start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
# Only start the agent loop if we successfully set the key
|
||||
if was_set:
|
||||
asyncio.create_task(
|
||||
self._start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
)
|
||||
)
|
||||
)
|
||||
# If was_set is False, another caller already set the key and is starting the loop
|
||||
status = ConversationStatus.STARTING
|
||||
|
||||
return AgentLoopInfo(
|
||||
conversation_id=sid,
|
||||
|
||||
@@ -339,3 +339,286 @@ class TestPausedSandboxResumption:
|
||||
mock_sandbox_service.resume_sandbox.assert_called_once_with('sandbox-123')
|
||||
mock_httpx_client.post.assert_called_once()
|
||||
mock_response.raise_for_status.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test 4: Runtime Ready Wait Logic for V0 Conversations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRuntimeReadyWait:
|
||||
"""Test the _wait_for_runtime_ready method behavior for V0 conversations.
|
||||
|
||||
This tests the fix for issue #12325 where follow-up messages would fail
|
||||
when the runtime was not yet ready (LOADING state).
|
||||
"""
|
||||
|
||||
@patch('integrations.slack.slack_view.conversation_manager')
|
||||
@patch('integrations.slack.slack_view.get_final_agent_observation')
|
||||
async def test_wait_for_runtime_ready_immediate_success(
|
||||
self,
|
||||
mock_get_final_agent_observation,
|
||||
mock_conversation_manager,
|
||||
slack_update_conversation_view_v0,
|
||||
):
|
||||
"""Test that _wait_for_runtime_ready returns immediately when runtime is ready."""
|
||||
from openhands.core.schema.agent import AgentState
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
# Setup: Runtime is already running with RUNNING agent state
|
||||
mock_agent_loop_info = MagicMock()
|
||||
mock_agent_loop_info.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info.event_store = MagicMock()
|
||||
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
|
||||
return_value=mock_agent_loop_info
|
||||
)
|
||||
|
||||
mock_observation = MagicMock()
|
||||
mock_observation.agent_state = AgentState.RUNNING
|
||||
mock_get_final_agent_observation.return_value = [mock_observation]
|
||||
|
||||
# Execute
|
||||
await slack_update_conversation_view_v0._wait_for_runtime_ready(
|
||||
user_id='test-user-123',
|
||||
conversation_init_data=MagicMock(),
|
||||
providers_set=[],
|
||||
)
|
||||
|
||||
# Verify: Should only call maybe_start_agent_loop once (no polling needed)
|
||||
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 1
|
||||
|
||||
@patch('integrations.slack.slack_view.conversation_manager')
|
||||
@patch('integrations.slack.slack_view.get_final_agent_observation')
|
||||
async def test_wait_for_runtime_ready_polls_until_ready(
|
||||
self,
|
||||
mock_get_final_agent_observation,
|
||||
mock_conversation_manager,
|
||||
slack_update_conversation_view_v0,
|
||||
):
|
||||
"""Test that _wait_for_runtime_ready polls until runtime becomes ready.
|
||||
|
||||
This is the key fix: instead of raising an error when agent_state is LOADING,
|
||||
we now wait for it to become ready.
|
||||
"""
|
||||
from openhands.core.schema.agent import AgentState
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
# Setup: First call returns LOADING, second call returns RUNNING
|
||||
mock_agent_loop_info_loading = MagicMock()
|
||||
mock_agent_loop_info_loading.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info_loading.event_store = MagicMock()
|
||||
|
||||
mock_agent_loop_info_running = MagicMock()
|
||||
mock_agent_loop_info_running.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info_running.event_store = MagicMock()
|
||||
|
||||
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
|
||||
side_effect=[mock_agent_loop_info_loading, mock_agent_loop_info_running]
|
||||
)
|
||||
|
||||
# First call: agent is LOADING, second call: agent is RUNNING
|
||||
mock_loading_observation = MagicMock()
|
||||
mock_loading_observation.agent_state = AgentState.LOADING
|
||||
|
||||
mock_running_observation = MagicMock()
|
||||
mock_running_observation.agent_state = AgentState.RUNNING
|
||||
|
||||
mock_get_final_agent_observation.side_effect = [
|
||||
[mock_loading_observation],
|
||||
[mock_running_observation],
|
||||
]
|
||||
|
||||
# Execute with short poll interval for faster test
|
||||
await slack_update_conversation_view_v0._wait_for_runtime_ready(
|
||||
user_id='test-user-123',
|
||||
conversation_init_data=MagicMock(),
|
||||
providers_set=[],
|
||||
poll_interval_seconds=0.01, # Very short for testing
|
||||
)
|
||||
|
||||
# Verify: Should have polled twice
|
||||
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
|
||||
|
||||
@patch('integrations.slack.slack_view.conversation_manager')
|
||||
@patch('integrations.slack.slack_view.get_final_agent_observation')
|
||||
async def test_wait_for_runtime_ready_polls_when_starting(
|
||||
self,
|
||||
mock_get_final_agent_observation,
|
||||
mock_conversation_manager,
|
||||
slack_update_conversation_view_v0,
|
||||
):
|
||||
"""Test that _wait_for_runtime_ready polls when conversation status is STARTING."""
|
||||
from openhands.core.schema.agent import AgentState
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
# Setup: First call returns STARTING status, second call returns RUNNING
|
||||
mock_agent_loop_info_starting = MagicMock()
|
||||
mock_agent_loop_info_starting.status = ConversationStatus.STARTING
|
||||
mock_agent_loop_info_starting.event_store = MagicMock()
|
||||
|
||||
mock_agent_loop_info_running = MagicMock()
|
||||
mock_agent_loop_info_running.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info_running.event_store = MagicMock()
|
||||
|
||||
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
|
||||
side_effect=[mock_agent_loop_info_starting, mock_agent_loop_info_running]
|
||||
)
|
||||
|
||||
mock_running_observation = MagicMock()
|
||||
mock_running_observation.agent_state = AgentState.RUNNING
|
||||
mock_get_final_agent_observation.return_value = [mock_running_observation]
|
||||
|
||||
# Execute
|
||||
await slack_update_conversation_view_v0._wait_for_runtime_ready(
|
||||
user_id='test-user-123',
|
||||
conversation_init_data=MagicMock(),
|
||||
providers_set=[],
|
||||
poll_interval_seconds=0.01,
|
||||
)
|
||||
|
||||
# Verify: Should have polled twice
|
||||
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
|
||||
|
||||
@patch('integrations.slack.slack_view.conversation_manager')
|
||||
@patch('integrations.slack.slack_view.get_final_agent_observation')
|
||||
async def test_wait_for_runtime_ready_timeout(
|
||||
self,
|
||||
mock_get_final_agent_observation,
|
||||
mock_conversation_manager,
|
||||
slack_update_conversation_view_v0,
|
||||
):
|
||||
"""Test that _wait_for_runtime_ready raises exception after timeout."""
|
||||
from integrations.slack.slack_types import StartingConvoException
|
||||
|
||||
from openhands.core.schema.agent import AgentState
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
# Setup: Always return LOADING state
|
||||
mock_agent_loop_info = MagicMock()
|
||||
mock_agent_loop_info.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info.event_store = MagicMock()
|
||||
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
|
||||
return_value=mock_agent_loop_info
|
||||
)
|
||||
|
||||
mock_loading_observation = MagicMock()
|
||||
mock_loading_observation.agent_state = AgentState.LOADING
|
||||
mock_get_final_agent_observation.return_value = [mock_loading_observation]
|
||||
|
||||
# Execute with very short timeout
|
||||
with pytest.raises(StartingConvoException) as exc_info:
|
||||
await slack_update_conversation_view_v0._wait_for_runtime_ready(
|
||||
user_id='test-user-123',
|
||||
conversation_init_data=MagicMock(),
|
||||
providers_set=[],
|
||||
max_wait_seconds=0.05, # Very short timeout for testing
|
||||
poll_interval_seconds=0.01,
|
||||
)
|
||||
|
||||
# Verify error message
|
||||
assert 'taking too long to start' in str(exc_info.value)
|
||||
|
||||
@patch('integrations.slack.slack_view.conversation_manager')
|
||||
@patch('integrations.slack.slack_view.get_final_agent_observation')
|
||||
async def test_wait_for_runtime_ready_handles_empty_observations(
|
||||
self,
|
||||
mock_get_final_agent_observation,
|
||||
mock_conversation_manager,
|
||||
slack_update_conversation_view_v0,
|
||||
):
|
||||
"""Test that _wait_for_runtime_ready handles empty observations gracefully."""
|
||||
from openhands.core.schema.agent import AgentState
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
|
||||
# Setup: First call returns empty observations, second call returns valid observation
|
||||
mock_agent_loop_info = MagicMock()
|
||||
mock_agent_loop_info.status = ConversationStatus.RUNNING
|
||||
mock_agent_loop_info.event_store = MagicMock()
|
||||
mock_conversation_manager.maybe_start_agent_loop = AsyncMock(
|
||||
return_value=mock_agent_loop_info
|
||||
)
|
||||
|
||||
mock_running_observation = MagicMock()
|
||||
mock_running_observation.agent_state = AgentState.RUNNING
|
||||
|
||||
# First call: empty observations, second call: valid observation
|
||||
mock_get_final_agent_observation.side_effect = [[], [mock_running_observation]]
|
||||
|
||||
# Execute
|
||||
await slack_update_conversation_view_v0._wait_for_runtime_ready(
|
||||
user_id='test-user-123',
|
||||
conversation_init_data=MagicMock(),
|
||||
providers_set=[],
|
||||
poll_interval_seconds=0.01,
|
||||
)
|
||||
|
||||
# Verify: Should have polled twice (empty observations means not ready)
|
||||
assert mock_conversation_manager.maybe_start_agent_loop.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test 5: Runtime Wait Tracker (Concurrency Limits and Metrics)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRuntimeWaitTracker:
|
||||
"""Test the runtime wait tracker for metrics."""
|
||||
|
||||
async def test_track_runtime_wait_increments_and_decrements(self):
|
||||
"""Test that the tracker correctly increments and decrements the counter."""
|
||||
from integrations.slack.runtime_wait_tracker import (
|
||||
get_current_wait_count,
|
||||
track_runtime_wait,
|
||||
)
|
||||
|
||||
initial_count = get_current_wait_count()
|
||||
|
||||
async with track_runtime_wait():
|
||||
assert get_current_wait_count() == initial_count + 1
|
||||
|
||||
assert get_current_wait_count() == initial_count
|
||||
|
||||
async def test_track_runtime_wait_decrements_on_exception(self):
|
||||
"""Test that the tracker decrements even when an exception is raised."""
|
||||
from integrations.slack.runtime_wait_tracker import (
|
||||
get_current_wait_count,
|
||||
track_runtime_wait,
|
||||
)
|
||||
|
||||
initial_count = get_current_wait_count()
|
||||
exception_raised = False
|
||||
|
||||
try:
|
||||
async with track_runtime_wait():
|
||||
assert get_current_wait_count() == initial_count + 1
|
||||
raise ValueError('Test exception')
|
||||
except ValueError:
|
||||
exception_raised = True
|
||||
|
||||
assert exception_raised
|
||||
assert get_current_wait_count() == initial_count
|
||||
|
||||
async def test_concurrent_waits_tracked_correctly(self):
|
||||
"""Test that multiple concurrent waits are tracked correctly."""
|
||||
import asyncio
|
||||
|
||||
from integrations.slack.runtime_wait_tracker import (
|
||||
get_current_wait_count,
|
||||
track_runtime_wait,
|
||||
)
|
||||
|
||||
initial_count = get_current_wait_count()
|
||||
results = []
|
||||
|
||||
async def wait_and_record():
|
||||
async with track_runtime_wait():
|
||||
results.append(get_current_wait_count())
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Run 3 concurrent waits
|
||||
await asyncio.gather(wait_and_record(), wait_and_record(), wait_and_record())
|
||||
|
||||
# All three should have seen counts >= initial + 1
|
||||
# (exact values depend on timing, but all should be > initial)
|
||||
assert all(r > initial_count for r in results)
|
||||
assert get_current_wait_count() == initial_count
|
||||
|
||||
Reference in New Issue
Block a user