Remove dead code from enterprise/integrations/utils.py (#14161)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell
2026-04-27 12:11:29 -06:00
committed by GitHub
parent 273c38f0b6
commit 5bb6522f2f
2 changed files with 3 additions and 510 deletions

View File

@@ -1,22 +1,11 @@
from __future__ import annotations
import json
import os
import re
from jinja2 import Environment, FileSystemLoader
from server.constants import WEB_HOST
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events import Event, EventSource
from openhands.events.action import (
AgentFinishAction,
MessageAction,
)
from openhands.events.event_filter import EventFilter
from openhands.events.event_store_abc import EventStoreABC
from openhands.events.observation.agent import AgentStateChangedObservation
from openhands.integrations.service_types import Repository
# ---- DO NOT REMOVE ----
@@ -26,10 +15,8 @@ HOST = WEB_HOST
IS_LOCAL_DEPLOYMENT = 'localhost' in HOST
HOST_URL = f'https://{HOST}' if not IS_LOCAL_DEPLOYMENT else f'http://{HOST}'
GITHUB_WEBHOOK_URL = f'{HOST_URL}/integration/github/events'
GITLAB_WEBHOOK_URL = f'{HOST_URL}/integration/gitlab/events'
conversation_prefix = 'conversations/{}'
CONVERSATION_URL = f'{HOST_URL}/{conversation_prefix}'
CONVERSATION_URL = f'{HOST_URL}/conversations/{{}}'
# Toggle for auto-response feature that proactively starts conversations with users when workflow tests fail
ENABLE_PROACTIVE_CONVERSATION_STARTERS = (
@@ -80,7 +67,7 @@ OPENHANDS_RESOLVER_TEMPLATES_DIR = (
os.getenv('OPENHANDS_RESOLVER_TEMPLATES_DIR')
or 'openhands/integrations/templates/resolver/'
)
jinja_env = Environment(loader=FileSystemLoader(OPENHANDS_RESOLVER_TEMPLATES_DIR))
_jinja_env = Environment(loader=FileSystemLoader(OPENHANDS_RESOLVER_TEMPLATES_DIR))
def get_oh_labels(web_host: str) -> tuple[str, str]:
@@ -102,7 +89,7 @@ def get_oh_labels(web_host: str) -> tuple[str, str]:
def get_summary_instruction():
summary_instruction_template = jinja_env.get_template('summary_prompt.j2')
summary_instruction_template = _jinja_env.get_template('summary_prompt.j2')
summary_instruction = summary_instruction_template.render()
return summary_instruction
@@ -133,205 +120,6 @@ def has_exact_mention(text: str, mention: str) -> bool:
return bool(re.search(rf'(?:^|[^\w@]){pattern}(?![\w-])', text_lower))
def confirm_event_type(event: Event):
return isinstance(event, AgentStateChangedObservation) and not (
event.agent_state == AgentState.REJECTED
or event.agent_state == AgentState.USER_CONFIRMED
or event.agent_state == AgentState.USER_REJECTED
or event.agent_state == AgentState.LOADING
or event.agent_state == AgentState.RUNNING
)
def get_readable_error_reason(reason: str):
if reason == 'STATUS$ERROR_LLM_AUTHENTICATION':
reason = 'Authentication with the LLM provider failed. Please check your API key or credentials'
elif reason == 'STATUS$ERROR_LLM_SERVICE_UNAVAILABLE':
reason = 'The LLM service is temporarily unavailable. Please try again later'
elif reason == 'STATUS$ERROR_LLM_INTERNAL_SERVER_ERROR':
reason = 'The LLM provider encountered an internal error. Please try again soon'
elif reason == 'STATUS$ERROR_LLM_OUT_OF_CREDITS':
reason = "You've run out of credits. Please top up to continue"
elif reason == 'STATUS$ERROR_LLM_CONTENT_POLICY_VIOLATION':
reason = 'Content policy violation. The output was blocked by content filtering policy'
return reason
def get_summary_for_agent_state(
observations: list[AgentStateChangedObservation], conversation_link: str
) -> str:
unknown_error_msg = f'OpenHands encountered an unknown error. [See the conversation]({conversation_link}) for more information, or try again'
if len(observations) == 0:
logger.error(
'Unknown error: No agent state observations found',
extra={'conversation_link': conversation_link},
)
return unknown_error_msg
observation: AgentStateChangedObservation = observations[0]
state = observation.agent_state
if state == AgentState.RATE_LIMITED:
logger.warning(
'Agent was rate limited',
extra={
'agent_state': state.value,
'conversation_link': conversation_link,
'observation_reason': getattr(observation, 'reason', None),
},
)
return 'OpenHands was rate limited by the LLM provider. Please try again later.'
if state == AgentState.ERROR:
reason = observation.reason
reason = get_readable_error_reason(reason)
logger.error(
'Agent encountered an error',
extra={
'agent_state': state.value,
'conversation_link': conversation_link,
'observation_reason': observation.reason,
'readable_reason': reason,
},
)
return f'OpenHands encountered an error: **{reason}**.\n\n[See the conversation]({conversation_link}) for more information.'
if state == AgentState.AWAITING_USER_INPUT:
logger.info(
'Agent is awaiting user input',
extra={
'agent_state': state.value,
'conversation_link': conversation_link,
'observation_reason': getattr(observation, 'reason', None),
},
)
return f'OpenHands is waiting for your input. [Continue the conversation]({conversation_link}) to provide additional instructions.'
# Log unknown agent state as error
logger.error(
'Unknown error: Unhandled agent state',
extra={
'agent_state': state.value if hasattr(state, 'value') else str(state),
'conversation_link': conversation_link,
'observation_reason': getattr(observation, 'reason', None),
},
)
return unknown_error_msg
def get_final_agent_observation(
event_store: EventStoreABC,
) -> list[AgentStateChangedObservation]:
events = list(
event_store.search_events(
filter=EventFilter(
source=EventSource.ENVIRONMENT,
include_types=(AgentStateChangedObservation,),
),
limit=1,
reverse=True,
)
)
result = [e for e in events if isinstance(e, AgentStateChangedObservation)]
assert len(result) == len(events)
return result
def get_last_user_msg(event_store: EventStoreABC) -> list[MessageAction]:
events = list(
event_store.search_events(
filter=EventFilter(
source=EventSource.USER,
include_types=(MessageAction,),
),
limit=1,
reverse=True,
)
)
result = [e for e in events if isinstance(e, MessageAction)]
assert len(result) == len(events)
return result
def extract_summary_from_event_store(
event_store: EventStoreABC, conversation_id: str
) -> str:
"""
Get agent summary or alternative message depending on current AgentState
"""
conversation_link = CONVERSATION_URL.format(conversation_id)
summary_instruction = get_summary_instruction()
instruction_events = list(
event_store.search_events(
filter=EventFilter(
query=json.dumps(summary_instruction),
source=EventSource.USER,
include_types=(MessageAction,),
),
limit=1,
reverse=True,
)
)
final_agent_observation = get_final_agent_observation(event_store)
# Find summary instruction event ID
if not instruction_events:
logger.warning(
'no_instruction_event_found', extra={'conversation_id': conversation_id}
)
return get_summary_for_agent_state(
final_agent_observation, conversation_link
) # Agent did not receive summary instruction
summary_events = list(
event_store.search_events(
filter=EventFilter(
source=EventSource.AGENT,
include_types=(MessageAction, AgentFinishAction),
),
limit=1,
reverse=True,
start_id=instruction_events[0].id,
)
)
if not summary_events:
logger.warning(
'no_agent_messages_found', extra={'conversation_id': conversation_id}
)
return get_summary_for_agent_state(
final_agent_observation, conversation_link
) # Agent failed to generate summary
summary_event = summary_events[0]
if isinstance(summary_event, MessageAction):
return summary_event.content
assert isinstance(summary_event, AgentFinishAction)
return summary_event.final_thought
def append_conversation_footer(message: str, conversation_id: str) -> str:
"""
Append a small footer with the conversation URL to a message.
Args:
message: The original message content
conversation_id: The conversation ID to link to
Returns:
The message with the conversation footer appended
"""
conversation_link = CONVERSATION_URL.format(conversation_id)
footer = f'\n\n[View full conversation]({conversation_link})'
return message + footer
def infer_repo_from_message(user_msg: str) -> list[str]:
"""
Extract all repository names in the format 'owner/repo' from various Git provider URLs

View File

@@ -1,171 +1,11 @@
"""Tests for enterprise integrations utils module."""
from unittest.mock import patch
import pytest
from integrations.utils import (
HOST_URL,
append_conversation_footer,
get_session_expired_message,
get_summary_for_agent_state,
get_user_not_found_message,
)
from openhands.core.schema.agent import AgentState
from openhands.events.observation.agent import AgentStateChangedObservation
class TestGetSummaryForAgentState:
"""Test cases for get_summary_for_agent_state function."""
def setup_method(self):
"""Set up test fixtures."""
self.conversation_link = 'https://example.com/conversation/123'
def test_empty_observations_list(self):
"""Test handling of empty observations list."""
result = get_summary_for_agent_state([], self.conversation_link)
assert 'unknown error' in result.lower()
assert self.conversation_link in result
@pytest.mark.parametrize(
'state,expected_text,includes_link',
[
(AgentState.RATE_LIMITED, 'rate limited', False),
(AgentState.AWAITING_USER_INPUT, 'waiting for your input', True),
],
)
def test_handled_agent_states(self, state, expected_text, includes_link):
"""Test handling of states with specific behavior."""
observation = AgentStateChangedObservation(
content=f'Agent state: {state.value}', agent_state=state
)
result = get_summary_for_agent_state([observation], self.conversation_link)
assert expected_text in result.lower()
if includes_link:
assert self.conversation_link in result
else:
assert self.conversation_link not in result
@pytest.mark.parametrize(
'state',
[
AgentState.FINISHED,
AgentState.PAUSED,
AgentState.STOPPED,
AgentState.AWAITING_USER_CONFIRMATION,
],
)
def test_unhandled_agent_states(self, state):
"""Test handling of unhandled states (should all return unknown error)."""
observation = AgentStateChangedObservation(
content=f'Agent state: {state.value}', agent_state=state
)
result = get_summary_for_agent_state([observation], self.conversation_link)
assert 'unknown error' in result.lower()
assert self.conversation_link in result
@pytest.mark.parametrize(
'error_code,expected_text',
[
(
'STATUS$ERROR_LLM_AUTHENTICATION',
'authentication with the llm provider failed',
),
(
'STATUS$ERROR_LLM_SERVICE_UNAVAILABLE',
'llm service is temporarily unavailable',
),
(
'STATUS$ERROR_LLM_INTERNAL_SERVER_ERROR',
'llm provider encountered an internal error',
),
('STATUS$ERROR_LLM_OUT_OF_CREDITS', "you've run out of credits"),
('STATUS$ERROR_LLM_CONTENT_POLICY_VIOLATION', 'content policy violation'),
],
)
def test_error_state_readable_reasons(self, error_code, expected_text):
"""Test all readable error reason mappings."""
observation = AgentStateChangedObservation(
content=f'Agent encountered error: {error_code}',
agent_state=AgentState.ERROR,
reason=error_code,
)
result = get_summary_for_agent_state([observation], self.conversation_link)
assert 'encountered an error' in result.lower()
assert expected_text in result.lower()
assert self.conversation_link in result
def test_error_state_with_custom_reason(self):
"""Test handling of ERROR state with a custom reason."""
observation = AgentStateChangedObservation(
content='Agent encountered an error',
agent_state=AgentState.ERROR,
reason='Test error message',
)
result = get_summary_for_agent_state([observation], self.conversation_link)
assert 'encountered an error' in result.lower()
assert 'test error message' in result.lower()
assert self.conversation_link in result
def test_multiple_observations_uses_first(self):
"""Test that when multiple observations are provided, only the first is used."""
observation1 = AgentStateChangedObservation(
content='Agent is awaiting user input',
agent_state=AgentState.AWAITING_USER_INPUT,
)
observation2 = AgentStateChangedObservation(
content='Agent encountered an error',
agent_state=AgentState.ERROR,
reason='Should not be used',
)
result = get_summary_for_agent_state(
[observation1, observation2], self.conversation_link
)
# Should handle the first observation (AWAITING_USER_INPUT), not the second (ERROR)
assert 'waiting for your input' in result.lower()
assert 'error' not in result.lower()
def test_awaiting_user_input_specific_message(self):
"""Test that AWAITING_USER_INPUT returns the specific expected message."""
observation = AgentStateChangedObservation(
content='Agent is awaiting user input',
agent_state=AgentState.AWAITING_USER_INPUT,
)
result = get_summary_for_agent_state([observation], self.conversation_link)
# Test the exact message format
assert 'waiting for your input' in result.lower()
assert 'continue the conversation' in result.lower()
assert self.conversation_link in result
assert 'unknown error' not in result.lower()
def test_rate_limited_specific_message(self):
"""Test that RATE_LIMITED returns the specific expected message."""
observation = AgentStateChangedObservation(
content='Agent was rate limited', agent_state=AgentState.RATE_LIMITED
)
result = get_summary_for_agent_state([observation], self.conversation_link)
# Test the exact message format
assert 'rate limited' in result.lower()
assert 'try again later' in result.lower()
# RATE_LIMITED doesn't include conversation link in response
assert self.conversation_link not in result
class TestGetSessionExpiredMessage:
"""Test cases for get_session_expired_message function."""
@@ -293,138 +133,3 @@ class TestGetUserNotFoundMessage:
result = get_user_not_found_message(None)
assert not result.startswith('@')
assert 'It looks like' in result
class TestAppendConversationFooter:
"""Test cases for append_conversation_footer function."""
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_appends_footer_with_markdown_link(self):
"""Test that footer is appended with correct markdown link format."""
# Arrange
message = 'This is a test message'
conversation_id = 'test-conv-123'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
assert result.startswith(message)
assert (
'[View full conversation](https://example.com/conversations/test-conv-123)'
in result
)
assert result.endswith(
'[View full conversation](https://example.com/conversations/test-conv-123)'
)
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_footer_does_not_contain_html_tags(self):
"""Test that footer does not contain HTML tags like <sub>."""
# Arrange
message = 'Test message'
conversation_id = 'test-conv-456'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
assert '<sub>' not in result
assert '</sub>' not in result
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_footer_format_with_newlines(self):
"""Test that footer is properly separated with newlines."""
# Arrange
message = 'Original message content'
conversation_id = 'test-conv-789'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
assert (
result
== 'Original message content\n\n[View full conversation](https://example.com/conversations/test-conv-789)'
)
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_empty_message_still_appends_footer(self):
"""Test that footer is appended even when message is empty."""
# Arrange
message = ''
conversation_id = 'empty-msg-conv'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
assert result.startswith('\n\n')
assert (
'[View full conversation](https://example.com/conversations/empty-msg-conv)'
in result
)
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_conversation_id_with_special_characters(self):
"""Test that footer handles conversation IDs with special characters."""
# Arrange
message = 'Test message'
conversation_id = 'conv-123_abc-456'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
expected_url = 'https://example.com/conversations/conv-123_abc-456'
assert expected_url in result
assert '[View full conversation]' in result
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_multiline_message_preserves_content(self):
"""Test that multiline messages are preserved correctly."""
# Arrange
message = 'Line 1\nLine 2\nLine 3'
conversation_id = 'multiline-conv'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
assert result.startswith('Line 1\nLine 2\nLine 3')
assert '\n\n[View full conversation]' in result
assert message in result
@patch(
'integrations.utils.CONVERSATION_URL', 'https://example.com/conversations/{}'
)
def test_footer_contains_only_markdown_syntax(self):
"""Test that footer uses only markdown syntax, not HTML."""
# Arrange
message = 'Test message'
conversation_id = 'markdown-test'
# Act
result = append_conversation_footer(message, conversation_id)
# Assert
footer_part = result[len(message) :]
# Should only contain markdown link syntax: [text](url)
assert footer_part.startswith('\n\n[')
assert '](' in footer_part
assert footer_part.endswith(')')
# Should not contain any HTML tags (specifically <sub> tags that were removed)
assert '<sub>' not in footer_part
assert '</sub>' not in footer_part