mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
319 lines
12 KiB
Python
319 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from server.constants import WEB_HOST
|
|
|
|
from openhands.app_server.integrations.service_types import Repository
|
|
|
|
# ---- DO NOT REMOVE ----
|
|
# WARNING: Langfuse depends on the WEB_HOST environment variable being set to track events.
|
|
HOST = WEB_HOST
|
|
# ---- DO NOT REMOVE ----
|
|
|
|
IS_LOCAL_DEPLOYMENT = 'localhost' in HOST
|
|
HOST_URL = f'https://{HOST}' if not IS_LOCAL_DEPLOYMENT else f'http://{HOST}'
|
|
GITLAB_WEBHOOK_URL = f'{HOST_URL}/integration/gitlab/events'
|
|
CONVERSATION_URL = f'{HOST_URL}/conversations/{{}}'
|
|
|
|
# Toggle for auto-response feature that proactively starts conversations with users when workflow tests fail
|
|
ENABLE_PROACTIVE_CONVERSATION_STARTERS = (
|
|
os.getenv('ENABLE_PROACTIVE_CONVERSATION_STARTERS', 'false').lower() == 'true'
|
|
)
|
|
|
|
|
|
def get_session_expired_message(username: str | None = None) -> str:
|
|
"""Get a user-friendly session expired message.
|
|
|
|
Used by integrations to notify users when their Keycloak offline session
|
|
has expired.
|
|
|
|
Args:
|
|
username: Optional username to mention in the message. If provided,
|
|
the message will include @username prefix (used by Git providers
|
|
like GitHub, GitLab, Slack). If None, returns a generic message
|
|
(used by Jira, Jira DC, Linear).
|
|
|
|
Returns:
|
|
A formatted session expired message
|
|
"""
|
|
if username:
|
|
return f'@{username} your session has expired. Please login again at [OpenHands Cloud]({HOST_URL}) and try again.'
|
|
return f'Your session has expired. Please login again at [OpenHands Cloud]({HOST_URL}) and try again.'
|
|
|
|
|
|
def get_user_not_found_message(username: str | None = None) -> str:
|
|
"""Get a user-friendly message when a user hasn't created an OpenHands account.
|
|
|
|
Used by integrations to notify users when they try to use OpenHands features
|
|
but haven't logged into OpenHands Cloud yet (no Keycloak account exists).
|
|
|
|
Args:
|
|
username: Optional username to mention in the message. If provided,
|
|
the message will include @username prefix (used by Git providers
|
|
like GitHub, GitLab, Slack). If None, returns a generic message.
|
|
|
|
Returns:
|
|
A formatted user not found message
|
|
"""
|
|
if username:
|
|
return f"@{username} it looks like you haven't created an OpenHands account yet. Please sign up at [OpenHands Cloud]({HOST_URL}) and try again."
|
|
return f"It looks like you haven't created an OpenHands account yet. Please sign up at [OpenHands Cloud]({HOST_URL}) and try again."
|
|
|
|
|
|
OPENHANDS_RESOLVER_TEMPLATES_DIR = (
|
|
os.getenv('OPENHANDS_RESOLVER_TEMPLATES_DIR')
|
|
or 'openhands/app_server/integrations/templates/resolver/'
|
|
)
|
|
_jinja_env = Environment(loader=FileSystemLoader(OPENHANDS_RESOLVER_TEMPLATES_DIR))
|
|
|
|
|
|
def get_oh_labels(web_host: str) -> tuple[str, str]:
|
|
"""Get the OpenHands labels based on the web host.
|
|
|
|
Args:
|
|
web_host: The web host string to check
|
|
|
|
Returns:
|
|
A tuple of (oh_label, inline_oh_label) where:
|
|
- oh_label is 'openhands-exp' for staging/local hosts, 'openhands' otherwise
|
|
- inline_oh_label is '@openhands-exp' for staging/local hosts, '@openhands' otherwise
|
|
"""
|
|
web_host = web_host.strip()
|
|
is_staging_or_local = 'staging' in web_host or 'local' in web_host
|
|
oh_label = 'openhands-exp' if is_staging_or_local else 'openhands'
|
|
inline_oh_label = '@openhands-exp' if is_staging_or_local else '@openhands'
|
|
return oh_label, inline_oh_label
|
|
|
|
|
|
def get_summary_instruction():
|
|
summary_instruction_template = _jinja_env.get_template('summary_prompt.j2')
|
|
summary_instruction = summary_instruction_template.render()
|
|
return summary_instruction
|
|
|
|
|
|
def has_exact_mention(text: str, mention: str) -> bool:
|
|
"""Check if the text contains an exact mention (not part of a larger word).
|
|
|
|
Args:
|
|
text: The text to check for mentions
|
|
mention: The mention to look for (e.g. "@openhands")
|
|
|
|
Returns:
|
|
bool: True if the exact mention is found, False otherwise
|
|
|
|
Example:
|
|
>>> has_exact_mention("Hello @openhands!", "@openhands") # True
|
|
>>> has_exact_mention("Hello @openhands-agent!", "@openhands") # False
|
|
>>> has_exact_mention("(@openhands)", "@openhands") # True
|
|
>>> has_exact_mention("user@openhands.com", "@openhands") # False
|
|
>>> has_exact_mention("Hello @OpenHands!", "@openhands") # True (case-insensitive)
|
|
"""
|
|
# Convert both text and mention to lowercase for case-insensitive matching
|
|
text_lower = text.lower()
|
|
mention_lower = mention.lower()
|
|
|
|
pattern = re.escape(mention_lower)
|
|
# Match mention that is not part of a larger word
|
|
return bool(re.search(rf'(?:^|[^\w@]){pattern}(?![\w-])', text_lower))
|
|
|
|
|
|
def infer_repo_from_message(user_msg: str) -> list[str]:
|
|
"""
|
|
Extract all repository names in the format 'owner/repo' from various Git provider URLs
|
|
and direct mentions in text. Supports GitHub, GitLab, and BitBucket.
|
|
"""
|
|
normalized_msg = re.sub(r'\s+', ' ', user_msg.strip())
|
|
|
|
git_url_pattern = (
|
|
r'https?://(?:github\.com|gitlab\.com|bitbucket\.org)/'
|
|
r'([a-zA-Z0-9_.-]+)/([a-zA-Z0-9_.-]+?)(?:\.git)?'
|
|
r'(?:[/?#].*?)?(?=\s|$|[^\w.-])'
|
|
)
|
|
|
|
# UPDATED: allow {{ owner/repo }} in addition to existing boundaries
|
|
direct_pattern = (
|
|
r'(?:^|\s|{{|[\[\(\'":`])' # left boundary
|
|
r'([a-zA-Z0-9_.-]+)/([a-zA-Z0-9_.-]+)'
|
|
r'(?=\s|$|}}|[\]\)\'",.:`])' # right boundary
|
|
)
|
|
|
|
# Use dict to preserve ordering
|
|
matches: dict[str, bool] = {}
|
|
|
|
# Git URLs first (highest priority)
|
|
for owner, repo in re.findall(git_url_pattern, normalized_msg):
|
|
repo = re.sub(r'\.git$', '', repo)
|
|
matches[f'{owner}/{repo}'] = True
|
|
|
|
# Direct mentions
|
|
for owner, repo in re.findall(direct_pattern, normalized_msg):
|
|
full_match = f'{owner}/{repo}'
|
|
|
|
if (
|
|
re.match(r'^\d+\.\d+/\d+\.\d+$', full_match)
|
|
or re.match(r'^\d{1,2}/\d{1,2}$', full_match)
|
|
or re.match(r'^[A-Z]/[A-Z]$', full_match)
|
|
or repo.endswith(('.txt', '.md', '.py', '.js'))
|
|
or ('.' in repo and len(repo.split('.')) > 2)
|
|
):
|
|
continue
|
|
|
|
if full_match not in matches:
|
|
matches[full_match] = True
|
|
|
|
result = list(matches)
|
|
return result
|
|
|
|
|
|
def filter_potential_repos_by_user_msg(
|
|
user_msg: str, user_repos: list[Repository]
|
|
) -> tuple[bool, list[Repository]]:
|
|
"""Filter repositories based on user message inference."""
|
|
inferred_repos = infer_repo_from_message(user_msg)
|
|
if not inferred_repos:
|
|
return False, user_repos[0:99]
|
|
|
|
final_repos = []
|
|
for repo in user_repos:
|
|
# Check if the repo matches any of the inferred repositories
|
|
for inferred_repo in inferred_repos:
|
|
if inferred_repo.lower() in repo.full_name.lower():
|
|
final_repos.append(repo)
|
|
break # Avoid adding the same repo multiple times
|
|
|
|
# no repos matched, return original list
|
|
if len(final_repos) == 0:
|
|
return False, user_repos[0:99]
|
|
|
|
# Found exact match
|
|
elif len(final_repos) == 1:
|
|
return True, final_repos
|
|
|
|
# Found partial matches
|
|
return False, final_repos[0:99]
|
|
|
|
|
|
def markdown_to_jira_markup(markdown_text: str) -> str:
|
|
"""
|
|
Convert markdown text to Jira Wiki Markup format.
|
|
This function handles common markdown elements and converts them to their
|
|
Jira Wiki Markup equivalents. It's designed to be exception-safe.
|
|
Args:
|
|
markdown_text: The markdown text to convert
|
|
Returns:
|
|
str: The converted Jira Wiki Markup text
|
|
"""
|
|
if not markdown_text or not isinstance(markdown_text, str):
|
|
return ''
|
|
|
|
try:
|
|
# Work with a copy to avoid modifying the original
|
|
text = markdown_text
|
|
|
|
# Convert headers (# ## ### #### ##### ######)
|
|
text = re.sub(r'^#{6}\s+(.*?)$', r'h6. \1', text, flags=re.MULTILINE)
|
|
text = re.sub(r'^#{5}\s+(.*?)$', r'h5. \1', text, flags=re.MULTILINE)
|
|
text = re.sub(r'^#{4}\s+(.*?)$', r'h4. \1', text, flags=re.MULTILINE)
|
|
text = re.sub(r'^#{3}\s+(.*?)$', r'h3. \1', text, flags=re.MULTILINE)
|
|
text = re.sub(r'^#{2}\s+(.*?)$', r'h2. \1', text, flags=re.MULTILINE)
|
|
text = re.sub(r'^#{1}\s+(.*?)$', r'h1. \1', text, flags=re.MULTILINE)
|
|
|
|
# Convert code blocks first (before other formatting)
|
|
text = re.sub(
|
|
r'```(\w+)\n(.*?)\n```', r'{code:\1}\n\2\n{code}', text, flags=re.DOTALL
|
|
)
|
|
text = re.sub(r'```\n(.*?)\n```', r'{code}\n\1\n{code}', text, flags=re.DOTALL)
|
|
|
|
# Convert inline code (`code`)
|
|
text = re.sub(r'`([^`]+)`', r'{{\1}}', text)
|
|
|
|
# Convert markdown formatting to Jira formatting
|
|
# Use temporary placeholders to avoid conflicts between bold and italic conversion
|
|
|
|
# First convert bold (double markers) to temporary placeholders
|
|
text = re.sub(r'\*\*(.*?)\*\*', r'JIRA_BOLD_START\1JIRA_BOLD_END', text)
|
|
text = re.sub(r'__(.*?)__', r'JIRA_BOLD_START\1JIRA_BOLD_END', text)
|
|
|
|
# Now convert single asterisk italics
|
|
text = re.sub(r'\*([^*]+?)\*', r'_\1_', text)
|
|
|
|
# Convert underscore italics
|
|
text = re.sub(r'(?<!_)_([^_]+?)_(?!_)', r'_\1_', text)
|
|
|
|
# Finally, restore bold markers
|
|
text = text.replace('JIRA_BOLD_START', '*')
|
|
text = text.replace('JIRA_BOLD_END', '*')
|
|
|
|
# Convert links [text](url)
|
|
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'[\1|\2]', text)
|
|
|
|
# Convert unordered lists (- or * or +)
|
|
text = re.sub(r'^[\s]*[-*+]\s+(.*?)$', r'* \1', text, flags=re.MULTILINE)
|
|
|
|
# Convert ordered lists (1. 2. etc.)
|
|
text = re.sub(r'^[\s]*\d+\.\s+(.*?)$', r'# \1', text, flags=re.MULTILINE)
|
|
|
|
# Convert strikethrough (~~text~~)
|
|
text = re.sub(r'~~(.*?)~~', r'-\1-', text)
|
|
|
|
# Convert horizontal rules (---, ***, ___)
|
|
text = re.sub(r'^[\s]*[-*_]{3,}[\s]*$', r'----', text, flags=re.MULTILINE)
|
|
|
|
# Convert blockquotes (> text)
|
|
text = re.sub(r'^>\s+(.*?)$', r'bq. \1', text, flags=re.MULTILINE)
|
|
|
|
# Convert tables (basic support)
|
|
# This is a simplified table conversion - Jira tables are quite different
|
|
lines = text.split('\n')
|
|
in_table = False
|
|
converted_lines = []
|
|
|
|
for line in lines:
|
|
if (
|
|
'|' in line
|
|
and line.strip().startswith('|')
|
|
and line.strip().endswith('|')
|
|
):
|
|
# Skip markdown table separator lines (contain ---)
|
|
if '---' in line:
|
|
continue
|
|
if not in_table:
|
|
in_table = True
|
|
# Convert markdown table row to Jira table row
|
|
cells = [cell.strip() for cell in line.split('|')[1:-1]]
|
|
converted_line = '|' + '|'.join(cells) + '|'
|
|
converted_lines.append(converted_line)
|
|
elif in_table and line.strip() and '|' not in line:
|
|
in_table = False
|
|
converted_lines.append(line)
|
|
else:
|
|
in_table = False
|
|
converted_lines.append(line)
|
|
|
|
text = '\n'.join(converted_lines)
|
|
|
|
return text
|
|
|
|
except Exception as e:
|
|
# Log the error but don't raise it - return original text as fallback
|
|
print(f'Error converting markdown to Jira markup: {str(e)}')
|
|
return markdown_text or ''
|
|
|
|
|
|
def format_jira_comment_body(message: str) -> dict:
|
|
"""Format a message as a Jira API v2 comment body.
|
|
|
|
This helper ensures consistent comment formatting across all Jira integrations.
|
|
Converts markdown to Jira Wiki Markup and wraps in the expected API structure.
|
|
|
|
Args:
|
|
message: The message content to send (may contain markdown)
|
|
|
|
Returns:
|
|
dict: The comment body in Jira API v2 format {'body': ...}
|
|
"""
|
|
return {'body': markdown_to_jira_markup(message)}
|