Compare commits

..

8 Commits

Author SHA1 Message Date
openhands a1cd8dd41f Update Linear install hook to not require workspace parameter
- Remove workspace_name parameter from API call
- Send empty object in POST request body
- Backend route will be updated to handle this case
2025-08-11 05:27:59 +00:00
openhands 2a7fbcd519 Refactor LinearInstallButton to use TanStack Query
- Replace direct window.location.href with TanStack Query mutation
- Add useLinearInstall hook for proper state management
- Add loading state and disabled button during installation
- Maintain same functionality while following React Query patterns
- Add proper error handling through TanStack Query
2025-08-11 05:23:05 +00:00
openhands 6ef82e1501 Update Linear integration UI and documentation 2025-08-11 02:46:24 +00:00
Tim O'Farrell 3302c31c60 Removed Hack that is no longer required (#10195) 2025-08-10 12:13:19 -06:00
Xingyao Wang 116ba199d1 feat(agent): stop using short tool description for gpt-5 (#10184) 2025-08-09 17:56:52 -04:00
Boxuan Li 803bdced9c Fix Windows prompt refinement: ensure 'bash' is replaced with 'powershell' in all prompts (#10179)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 20:28:36 -07:00
Xingyao Wang 3eecac2003 docs: Add GPT-5 model recommendation and fix pricing display issue (#10177)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 19:19:59 +00:00
mamoodi c02e09fc2d Hide Git Settings section from Application settings (#10176)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 19:06:40 +00:00
28 changed files with 335 additions and 1342 deletions
+3 -18
View File
@@ -360,28 +360,13 @@ classpath = "my_package.my_module.MyCustomAgent"
[security]
# Enable confirmation mode (For Headless / CLI only - In Web this is overridden by Session Init)
# When using command_approval analyzer, this should be enabled
confirmation_mode = true
#confirmation_mode = false
# The security analyzer to use (For Headless / CLI only - In Web this is overridden by Session Init)
# Available options: "invariant", "command_approval"
# For CLI with confirmation mode, "command_approval" is recommended
security_analyzer = "command_approval"
#security_analyzer = ""
# Whether to enable security analyzer
# When using command_approval analyzer, this should be enabled
enable_security_analyzer = true
# Dictionary of approved commands that don't require confirmation
# The key is the command, and the value is a boolean (true to approve)
#approved_commands = { "ls -la" = true, "git status" = true }
# List of approved command patterns (regex) that don't require confirmation
#approved_command_patterns = [
# { pattern = "^ls( -[a-zA-Z]+)?( \\S+)?$", description = "List directory contents" },
# { pattern = "^cd \\S+$", description = "Change directory" },
# { pattern = "^git (status|log|diff)$", description = "Basic git commands" }
#]
#enable_security_analyzer = false
#################################### Condenser #################################
# Condensers control how conversation history is managed and compressed when
@@ -67,24 +67,27 @@ description: Complete guide for setting up Linear integration with OpenHands Clo
- Sign in with your Git provider (GitHub, GitLab, or BitBucket)
- **Important:** Make sure you're signing in with the same Git provider account that contains the repositories you want the OpenHands agent to work on.
### Step 2: Configure Linear Integration
### Step 2: View Available Workspaces
1. **Access Integration Settings**
- Navigate to **Settings** > **Integrations**
- Locate **Linear** section
2. **Configure Workspace**
- Click **Configure** button
- Enter your workspace name and click **Connect**
- If no integration exists, you'll be prompted to enter additional credentials required for the workspace integration:
- **Webhook Secret**: The webhook secret from Step 3 above
- **Service Account Email**: The service account email from Step 1 above
- **Service Account API Key**: The API key from Step 2 above
- Ensure **Active** toggle is enabled
2. **Install Linear App**
- Click the **Install Linear App** button
- You'll be redirected to a page showing available Linear workspaces
3. **Complete OAuth Flow**
3. **Select Workspace**
- Choose the workspace you want to connect to
- If no integration exists, you'll be prompted to enter additional credentials required for the workspace integration:
- **Webhook Secret**: The webhook secret from Step 3 above
- **Service Account Email**: The service account email from Step 1 above
- **Service Account API Key**: The API key from Step 2 above
- Ensure **Active** toggle is enabled
4. **Complete OAuth Flow**
- You'll be redirected to Linear to complete OAuth verification
- Grant the necessary permissions to verify your workspace access. If you have access to multiple workspaces, select the correct one that you initially provided
- Grant the necessary permissions to verify your workspace access
- If successful, you will be redirected back to the **Integrations** settings in the OpenHands Cloud UI
### Managing Your Integration
+1 -1
View File
@@ -18,7 +18,7 @@ Based on these findings and community feedback, these are the latest models that
### Cloud / API-Based Models
- [anthropic/claude-sonnet-4-20250514](https://www.anthropic.com/api) (recommended)
- [openai/o4-mini](https://openai.com/index/introducing-o3-and-o4-mini/)
- [openai/gpt-5-2025-08-07](https://openai.com/api/) (recommended)
- [gemini/gemini-2.5-pro](https://blog.google/technology/google-deepmind/gemini-model-thinking-updates-march-2025/)
- [deepseek/deepseek-chat](https://api-docs.deepseek.com/)
- [moonshot/kimi-k2-0711-preview](https://platform.moonshot.ai/docs/pricing/chat#generation-model-kimi-k2)
+1 -1
View File
@@ -32,4 +32,4 @@ When running OpenHands, you'll need to set the following in the OpenHands UI thr
Pricing follows official API provider rates. [You can view model prices here.](https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json)
For `qwen3-coder-480b`, we charge the cheapest FP8 rate available on openrouter: $0.4 per million input tokens and $1.6 per million output tokens.
For `qwen3-coder-480b`, we charge the cheapest FP8 rate available on openrouter: \$0.4 per million input tokens and \$1.6 per million output tokens.
@@ -10,6 +10,7 @@ import {
ConfigureButton,
ConfigureModal,
} from "#/components/features/settings/project-management/configure-modal";
import { LinearInstallButton } from "#/components/features/settings/project-management/linear-install-button";
interface IntegrationRowProps {
platform: "jira" | "jira-dc" | "linear";
@@ -88,23 +89,29 @@ export function IntegrationRow({
<div className="flex items-center justify-between" data-testid={dataTestId}>
<span className="font-medium">{platformName}</span>
<div className="flex items-center gap-6">
<ConfigureButton
onClick={handleConfigure}
isDisabled={isLoading}
text={buttonText}
data-testid={`${platform}-configure-button`}
/>
{platform === "linear" ? (
<LinearInstallButton data-testid={`${platform}-install-button`} />
) : (
<ConfigureButton
onClick={handleConfigure}
isDisabled={isLoading}
text={buttonText}
data-testid={`${platform}-configure-button`}
/>
)}
</div>
<ConfigureModal
isOpen={isConfigureModalOpen}
onClose={() => setConfigureModalOpen(false)}
onConfirm={handleConfigureConfirm}
onLink={handleLink}
onUnlink={handleUnlink}
platformName={platformName}
platform={platform}
integrationData={integrationData}
/>
{platform !== "linear" && (
<ConfigureModal
isOpen={isConfigureModalOpen}
onClose={() => setConfigureModalOpen(false)}
onConfirm={handleConfigureConfirm}
onLink={handleLink}
onUnlink={handleUnlink}
platformName={platformName}
platform={platform}
integrationData={integrationData}
/>
)}
</div>
);
}
@@ -0,0 +1,33 @@
import React from "react";
import { useTranslation } from "react-i18next";
import { I18nKey } from "#/i18n/declaration";
import { useLinearInstall } from "#/hooks/mutation/use-linear-install";
interface LinearInstallButtonProps {
"data-testid"?: string;
}
export function LinearInstallButton({
"data-testid": dataTestId,
}: LinearInstallButtonProps) {
const { t } = useTranslation();
const linearInstallMutation = useLinearInstall();
const handleInstallClick = () => {
linearInstallMutation.mutate();
};
return (
<button
type="button"
onClick={handleInstallClick}
disabled={linearInstallMutation.isPending}
className="px-4 py-2 bg-blue-600 text-white rounded-md hover:bg-blue-700 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
data-testid={dataTestId}
>
{linearInstallMutation.isPending
? "Installing..."
: t(I18nKey.PROJECT_MANAGEMENT$INSTALL_LINEAR_APP)}
</button>
);
}
@@ -0,0 +1,42 @@
import { useMutation } from "@tanstack/react-query";
import { useTranslation } from "react-i18next";
import { openHands } from "#/api/open-hands-axios";
import { I18nKey } from "#/i18n/declaration";
import { displayErrorToast } from "#/utils/custom-toast-handlers";
import { retrieveAxiosErrorMessage } from "#/utils/retrieve-axios-error-message";
export function useLinearInstall() {
const { t } = useTranslation();
return useMutation({
mutationFn: async () => {
const response = await openHands.post(
"/integration/linear/workspaces/link",
{},
);
const { success, redirect, authorizationUrl } = response.data;
if (success) {
if (redirect) {
if (authorizationUrl) {
window.location.href = authorizationUrl;
} else {
throw new Error("Could not get authorization URL from the server.");
}
} else {
window.location.reload();
}
} else {
throw new Error("Linear installation failed");
}
return response.data;
},
onError: (error) => {
const errorMessage = retrieveAxiosErrorMessage(error);
displayErrorToast(errorMessage || t(I18nKey.ERROR$GENERIC));
},
});
}
+1
View File
@@ -747,6 +747,7 @@ export enum I18nKey {
PROJECT_MANAGEMENT$UNLINK_BUTTON_LABEL = "PROJECT_MANAGEMENT$UNLINK_BUTTON_LABEL",
PROJECT_MANAGEMENT$LINK_CONFIRMATION_TITLE = "PROJECT_MANAGEMENT$LINK_CONFIRMATION_TITLE",
PROJECT_MANAGEMENT$CONFIGURE_BUTTON_LABEL = "PROJECT_MANAGEMENT$CONFIGURE_BUTTON_LABEL",
PROJECT_MANAGEMENT$INSTALL_LINEAR_APP = "PROJECT_MANAGEMENT$INSTALL_LINEAR_APP",
PROJECT_MANAGEMENT$EDIT_BUTTON_LABEL = "PROJECT_MANAGEMENT$EDIT_BUTTON_LABEL",
PROJECT_MANAGEMENT$WORKSPACE_NAME_LABEL = "PROJECT_MANAGEMENT$WORKSPACE_NAME_LABEL",
PROJECT_MANAGEMENT$WORKSPACE_NAME_PLACEHOLDER = "PROJECT_MANAGEMENT$WORKSPACE_NAME_PLACEHOLDER",
+7
View File
@@ -11951,6 +11951,13 @@
"de": "Konfigurieren",
"uk": "Налаштувати"
},
"PROJECT_MANAGEMENT$INSTALL_LINEAR_APP": {
"en": "Install Linear App",
"ja": "Linear アプリをインストール",
"zh-CN": "安装 Linear 应用",
"zh-TW": "安裝 Linear 應用",
"ko-KR": "Linear 앱 설치"
},
"PROJECT_MANAGEMENT$EDIT_BUTTON_LABEL": {
"en": "Edit",
"ja": "編集",
+1 -1
View File
@@ -222,7 +222,7 @@ function AppSettingsScreen() {
className="w-full max-w-[680px]" // Match the width of the language field
/>
<div className="border-t border-t-tertiary pt-6 mt-2">
<div className="border-t border-t-tertiary pt-6 mt-2 hidden">
<h3 className="text-lg font-medium mb-4">
{t(I18nKey.SETTINGS$GIT_SETTINGS)}
</h3>
@@ -106,10 +106,15 @@ class CodeActAgent(Agent):
def _get_tools(self) -> list['ChatCompletionToolParam']:
# For these models, we use short tool descriptions ( < 1024 tokens)
# to avoid hitting the OpenAI token limit for tool descriptions.
SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS = ['gpt-', 'o3', 'o1', 'o4']
SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS = ['gpt-4', 'o3', 'o1', 'o4']
use_short_tool_desc = False
if self.llm is not None:
# For historical reasons, previously OpenAI enforces max function description length of 1k characters
# https://community.openai.com/t/function-call-description-max-length/529902
# But it no longer seems to be an issue recently
# https://community.openai.com/t/was-the-character-limit-for-schema-descriptions-upgraded/1225975
# Tested on GPT-5 and longer description still works. But we still keep the logic to be safe for older models.
use_short_tool_desc = any(
model_substr in self.llm.config.model
for model_substr in SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS
+11 -1
View File
@@ -1,3 +1,4 @@
import re
import sys
from litellm import ChatCompletionToolParam, ChatCompletionToolParamFunctionChunk
@@ -37,7 +38,16 @@ _SHORT_BASH_DESCRIPTION = """Execute a bash command in the terminal.
def refine_prompt(prompt: str):
if sys.platform == 'win32':
return prompt.replace('bash', 'powershell')
# Replace 'bash' with 'powershell' including tool names like 'execute_bash'
# First replace 'execute_bash' with 'execute_powershell' to handle tool names
result = re.sub(
r'\bexecute_bash\b', 'execute_powershell', prompt, flags=re.IGNORECASE
)
# Then replace standalone 'bash' with 'powershell'
result = re.sub(
r'(?<!execute_)(?<!_)\bbash\b', 'powershell', result, flags=re.IGNORECASE
)
return result
return prompt
+2 -37
View File
@@ -236,43 +236,8 @@ async def run_session(
)
return
# Get the pending action from the agent controller
pending_action = controller._pending_action
command = ''
if pending_action:
if hasattr(pending_action, 'command'):
command = pending_action.command
elif hasattr(pending_action, 'code'):
command = pending_action.code
confirmation_status = await read_confirmation_input(
config, command, pending_action
)
# Handle different confirmation responses
if confirmation_status == 'always':
# Set always confirm mode to skip future confirmations
always_confirm_mode = True
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
)
elif confirmation_status.startswith('remember:'):
# Parse the remember response: remember:pattern:description
parts = confirmation_status.split(':', 2)
if len(parts) == 3:
_, pattern, description = parts
# Save the command pattern to config
from openhands.cli.utils import save_approved_command_to_config
save_approved_command_to_config(
command, pattern=pattern, description=description
)
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
)
elif confirmation_status == 'yes':
confirmation_status = await read_confirmation_input(config)
if confirmation_status in ('yes', 'always'):
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
+3 -213
View File
@@ -700,140 +700,12 @@ async def read_prompt_input(
return '/exit'
def _generate_single_command_pattern(command: str) -> list[str]:
"""Generate candidate regex patterns for a single command based on prefixes.
Args:
command: The single command to generate patterns for.
Returns:
list[str]: List of candidate regex patterns that match similar commands.
"""
import re
# Split command into parts
parts = command.split()
if not parts:
return [f'^{re.escape(command)}$']
patterns = []
# Generate patterns for first word, first two words, first three words
for i in range(1, min(4, len(parts) + 1)):
prefix_parts = parts[:i]
escaped_prefix = '\\s+'.join(re.escape(part) for part in prefix_parts)
# Create pattern: prefix followed by word boundary and anything until end of line
# This prevents matching partial words (e.g., "ls" won't match "lsof")
if i == 1:
# For single word, add word boundary to prevent partial matches
pattern = f'^{escaped_prefix}(\\s.*|$)'
else:
# For multiple words, the space already acts as a boundary
pattern = f'^{escaped_prefix}.*$'
patterns.append(pattern)
return patterns
def _parse_piped_command(command: str) -> list[str]:
"""Parse a command that may contain pipes into individual commands.
Args:
command: The command string to parse.
Returns:
list[str]: List of individual commands split by pipes.
"""
import shlex
# Handle edge cases first
if not command or not command.strip():
return []
# Use shlex to split the command, which handles quotes and escapes properly
try:
# Split the entire command into tokens
tokens = shlex.split(command, posix=True)
except ValueError:
# If shlex fails (e.g., unmatched quotes), fall back to simple split
return [part.strip() for part in command.split('|') if part.strip()]
# Find pipe tokens and split the command accordingly
parts = []
current_part: list[str] = []
for token in tokens:
if token == '|':
if current_part:
parts.append(' '.join(current_part))
current_part = []
else:
current_part.append(token)
# Add the last part if it exists
if current_part:
parts.append(' '.join(current_part))
# If no pipes were found in tokens, check if the original command has pipes
# This handles cases where pipes are not separated by spaces
if len(parts) <= 1 and '|' in command:
return [part.strip() for part in command.split('|') if part.strip()]
return parts
def _generate_command_patterns(command: str) -> list[str]:
"""Generate candidate regex patterns for a command that can match similar commands.
This function handles both simple commands and piped commands.
For piped commands, it generates patterns for each sub-command.
Args:
command: The command to generate patterns for (may contain pipes).
Returns:
list[str]: List of candidate regex patterns that match similar commands.
"""
# Parse the command to handle pipes
sub_commands = _parse_piped_command(command)
if len(sub_commands) <= 1:
# Single command or empty, return all candidate patterns
return _generate_single_command_pattern(command)
else:
# Piped command, generate pattern for each sub-command
# For simplicity, we'll just use the first pattern from each sub-command
sub_patterns = []
for sub_command in sub_commands:
patterns = _generate_single_command_pattern(sub_command.strip())
if patterns:
# Use the most specific pattern (first one)
sub_pattern = patterns[0]
# Remove the ^ and $ anchors from sub-patterns
if sub_pattern.startswith('^'):
sub_pattern = sub_pattern[1:]
if sub_pattern.endswith('$'):
sub_pattern = sub_pattern[:-1]
sub_patterns.append(sub_pattern)
if sub_patterns:
# Join sub-patterns with pipe separator (allowing flexible whitespace around pipes)
pattern = '\\s*\\|\\s*'.join(sub_patterns)
return [f'^{pattern}$']
else:
return []
async def read_confirmation_input(
config: OpenHandsConfig, command: str = '', pending_action=None
) -> str:
async def read_confirmation_input(config: OpenHandsConfig) -> str:
try:
choices = [
'Yes, proceed',
'No (and allow to enter instructions)',
'Always proceed (skip all confirmations)',
"Yes, and don't ask again for similar commands",
"Always proceed (don't ask again)",
]
# keep the outer coroutine responsive by using asyncio.to_thread which puts the blocking call app.run() of cli_confirm() in a separate thread
@@ -841,89 +713,7 @@ async def read_confirmation_input(
cli_confirm, config, 'Choose an option:', choices
)
result = {0: 'yes', 1: 'no', 2: 'always', 3: 'remember'}.get(index, 'no')
# If the user chose "remember", show pattern selection options
if result == 'remember' and command:
return await _handle_pattern_selection(config, command)
return result
except (KeyboardInterrupt, EOFError):
return 'no'
async def _handle_pattern_selection(config: OpenHandsConfig, command: str) -> str:
"""Handle pattern selection when user chooses to remember similar commands."""
try:
# Generate candidate patterns
patterns = _generate_command_patterns(command)
# Create pattern descriptions
pattern_choices = []
pattern_data = []
# Add exact command option
import re
exact_pattern = f'^{re.escape(command)}$'
pattern_choices.append(f'Exact command: {command}')
pattern_data.append(('exact', exact_pattern, f'Exact command: {command}'))
# Add prefix-based patterns
parts = command.split()
if parts:
for i, pattern in enumerate(patterns):
if i < len(parts):
prefix = ' '.join(parts[: i + 1])
description = f'Commands starting with: {prefix}'
pattern_choices.append(description)
pattern_data.append(('pattern', pattern, description))
# Add custom pattern option
pattern_choices.append('Enter custom pattern')
pattern_data.append(('custom', '', 'Custom pattern'))
# Show pattern selection menu
pattern_index = await asyncio.to_thread(
cli_confirm, config, 'Choose which commands to remember:', pattern_choices
)
if pattern_index < len(pattern_data):
pattern_type, pattern_value, description = pattern_data[pattern_index]
if pattern_type == 'custom':
# Get custom pattern from user
prompt_session = create_prompt_session(config)
print_formatted_text(
HTML(
'<gold>Enter a custom regex pattern (example: ^ls.*$ for all ls commands):</gold>'
)
)
try:
custom_pattern = await prompt_session.prompt_async('Pattern: ')
if custom_pattern.strip():
# Validate the regex pattern
import re
try:
re.compile(custom_pattern.strip())
return f'remember:{custom_pattern.strip()}:Custom pattern: {custom_pattern.strip()}'
except re.error:
print_formatted_text(
HTML(
'<ansired>Invalid regex pattern. Using exact command instead.</ansired>'
)
)
return f'remember:^{re.escape(command)}$:Exact command: {command}'
else:
return 'no'
except (KeyboardInterrupt, EOFError):
return 'no'
else:
return f'remember:{pattern_value}:{description}'
return 'no'
return {0: 'yes', 1: 'no', 2: 'always'}.get(index, 'no')
except (KeyboardInterrupt, EOFError):
return 'no'
-79
View File
@@ -243,82 +243,3 @@ def read_file(file_path: str | Path) -> str:
def write_to_file(file_path: str | Path, content: str) -> None:
with open(file_path, 'w') as f:
f.write(content)
def save_approved_command_to_config(
command: str, pattern: str | None = None, description: str | None = None
) -> None:
"""Save an approved command or pattern to the config file.
Args:
command: The command to save as approved.
pattern: Optional regex pattern to save instead of exact command.
description: Optional description for the pattern.
"""
config_path = _LOCAL_CONFIG_FILE_PATH
# Load existing config or create a new one
if config_path.exists():
try:
with open(config_path, 'r') as f:
config_data = toml.load(f)
except Exception as e:
from openhands.core.logger import openhands_logger
openhands_logger.warning(f'Error loading config file: {e}')
config_data = {}
else:
config_data = {}
config_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure security section exists
if 'security' not in config_data:
config_data['security'] = {}
if pattern and description:
# Save as a pattern
if 'approved_command_patterns' not in config_data['security']:
config_data['security']['approved_command_patterns'] = []
# Check if pattern already exists
pattern_exists = any(
p == pattern
for p in config_data['security']['approved_command_patterns']
if isinstance(p, str)
) or any(
p.get('pattern') == pattern
for p in config_data['security']['approved_command_patterns']
if isinstance(p, dict)
)
if not pattern_exists:
# Just save the pattern string directly
config_data['security']['approved_command_patterns'].append(pattern)
from openhands.core.logger import openhands_logger
openhands_logger.info(
f"Pattern '{pattern}' saved to approved command patterns in {config_path}"
)
else:
# Save as exact command
if 'approved_commands' not in config_data['security']:
config_data['security']['approved_commands'] = {}
# Add the command to the approved commands
config_data['security']['approved_commands'][command] = True
from openhands.core.logger import openhands_logger
openhands_logger.info(
f"Command '{command}' saved to approved commands in {config_path}"
)
# Write the updated config back to the file
try:
with open(config_path, 'w') as f:
toml.dump(config_data, f)
except Exception as e:
from openhands.core.logger import openhands_logger
openhands_logger.error(f'Error saving approved command to config: {e}')
+3 -34
View File
@@ -875,40 +875,9 @@ class AgentController:
if self.state.confirmation_mode and (
type(action) is CmdRunAction or type(action) is IPythonRunCellAction
):
# Check if the command is already approved
command = ''
if type(action) is CmdRunAction:
command = action.command
elif type(action) is IPythonRunCellAction:
command = action.code
# Get the security config
import toml
from openhands.core.config import SecurityConfig
# Load security config from the config file
security_config = SecurityConfig()
try:
with open('config.toml', 'r', encoding='utf-8') as f:
config_data = toml.load(f)
if 'security' in config_data:
security_config = SecurityConfig.model_validate(
config_data['security']
)
except Exception:
# If loading fails, use default config
pass
# Check if the command is approved
if security_config.is_command_approved(command):
# Command is already approved, no need for confirmation
action.confirmation_state = ActionConfirmationStatus.CONFIRMED
else:
# Command needs confirmation
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
self._pending_action = action
if not isinstance(action, NullAction):
+1 -47
View File
@@ -1,8 +1,5 @@
from pydantic import BaseModel, ConfigDict, Field, ValidationError
# ApprovedCommandPattern is now just a string containing the regex pattern
ApprovedCommandPattern = str
class SecurityConfig(BaseModel):
"""Configuration for security related functionalities.
@@ -10,33 +7,13 @@ class SecurityConfig(BaseModel):
Attributes:
confirmation_mode: Whether to enable confirmation mode.
security_analyzer: The security analyzer to use.
approved_command_patterns: List of regex patterns for commands that don't require confirmation.
approved_commands: Dictionary of exact commands that have been approved.
"""
confirmation_mode: bool = Field(default=False)
security_analyzer: str | None = Field(default=None)
approved_command_patterns: list[ApprovedCommandPattern] = Field(
default_factory=list
)
approved_commands: dict[str, bool] = Field(default_factory=dict)
model_config = ConfigDict(extra='forbid')
def is_command_approved(self, command: str) -> bool:
"""Check if a command is approved.
This is a stub method that always returns False.
The actual implementation is in CommandApprovalAnalyzer.
Args:
command: The command to check.
Returns:
bool: Always False in this stub implementation.
"""
return False
@classmethod
def from_toml_section(cls, data: dict) -> dict[str, 'SecurityConfig']:
"""
@@ -51,32 +28,9 @@ class SecurityConfig(BaseModel):
# Initialize the result mapping
security_mapping: dict[str, SecurityConfig] = {}
# Extract approved command patterns if present
approved_patterns = []
if 'approved_command_patterns' in data:
patterns_data = data.pop('approved_command_patterns')
if isinstance(patterns_data, list):
for pattern_data in patterns_data:
# Handle the new format (just a string pattern)
if isinstance(pattern_data, str):
approved_patterns.append(pattern_data)
# Handle the old format (dict with pattern and description)
elif isinstance(pattern_data, dict) and 'pattern' in pattern_data:
approved_patterns.append(pattern_data['pattern'])
# Extract approved commands if present
approved_commands = {}
if 'approved_commands' in data:
commands_data = data.pop('approved_commands')
if isinstance(commands_data, dict):
approved_commands = commands_data
# Try to create the configuration instance
try:
config = cls.model_validate(data)
config.approved_command_patterns = approved_patterns
config.approved_commands = approved_commands
security_mapping['security'] = config
security_mapping['security'] = cls.model_validate(data)
except ValidationError as e:
raise ValueError(f'Invalid security configuration: {e}')
+1 -1
View File
@@ -383,7 +383,7 @@ Do NOT assume the environment is the same as in the example above.
"""
example = example.lstrip()
return example
return refine_prompt(example)
IN_CONTEXT_LEARNING_EXAMPLE_PREFIX = get_example_for_tools
-2
View File
@@ -1,9 +1,7 @@
from openhands.security.analyzer import SecurityAnalyzer
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
from openhands.security.invariant.analyzer import InvariantAnalyzer
__all__ = [
'SecurityAnalyzer',
'InvariantAnalyzer',
'CommandApprovalAnalyzer',
]
@@ -1,3 +0,0 @@
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
__all__ = ['CommandApprovalAnalyzer']
@@ -1,305 +0,0 @@
"""Command approval analyzer for security."""
import re
from typing import Any
import bashlex
from fastapi import Request
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.action import (
Action,
ActionConfirmationStatus,
ActionSecurityRisk,
)
from openhands.events.action.commands import CmdRunAction, IPythonRunCellAction
from openhands.events.event import Event
from openhands.events.stream import EventStream
from openhands.security.analyzer import SecurityAnalyzer
class CommandPattern:
"""A pattern for matching commands."""
def __init__(self, pattern: str, description: str):
"""Initialize a new command pattern.
Args:
pattern: The regex pattern to match commands against.
description: A human-readable description of what this pattern matches.
"""
self.pattern = pattern
self.description = description
self._compiled_pattern = re.compile(pattern)
def matches(self, command: str) -> bool:
"""Check if the command matches this pattern.
Args:
command: The command to check.
Returns:
bool: True if the command matches, False otherwise.
"""
return bool(self._compiled_pattern.match(command))
class CommandParser:
"""Parser for bash commands using bashlex."""
def is_piped_command(self, command: str) -> bool:
"""Check if a command contains pipes.
Args:
command: The command to check.
Returns:
bool: True if the command contains pipes, False otherwise.
"""
if not command or not command.strip():
return False
try:
parts = bashlex.parse(command)
for part in parts:
if part.kind == 'pipeline':
return True
return False
except Exception as e:
logger.warning(f'Error parsing command with bashlex: {e}')
# Fallback: check for pipe character not in quotes
# This is a simple heuristic and not as accurate as bashlex parsing
in_single_quote = False
in_double_quote = False
for char in command:
if char == "'" and not in_double_quote:
in_single_quote = not in_single_quote
elif char == '"' and not in_single_quote:
in_double_quote = not in_double_quote
elif char == '|' and not in_single_quote and not in_double_quote:
return True
return False
def parse_command(self, command: str) -> list[str]:
"""Parse a command into individual parts, handling pipes.
Args:
command: The command to parse.
Returns:
List[str]: List of individual commands.
"""
if not command or not command.strip():
return []
try:
parts = bashlex.parse(command)
commands = []
# Helper function to extract command from a node
def extract_command(node):
if node.kind == 'command':
cmd_parts = []
for part in node.parts:
if hasattr(part, 'word'):
cmd_parts.append(part.word)
if cmd_parts:
return ' '.join(cmd_parts)
return None
# Process the AST
for part in parts:
if part.kind == 'pipeline':
# A pipeline has multiple commands
for subpart in part.parts:
if subpart.kind == 'command':
cmd = extract_command(subpart)
if cmd:
commands.append(cmd)
elif part.kind == 'command':
# A single command
cmd = extract_command(part)
if cmd:
commands.append(cmd)
elif part.kind == 'list':
# A list of commands (e.g., with && or ||)
# We only take the first command for approval purposes
for subpart in part.parts:
if subpart.kind == 'command':
cmd = extract_command(subpart)
if cmd:
commands.append(cmd)
break
elif subpart.kind == 'operator':
# Stop at the first operator
break
return commands
except Exception as e:
logger.warning(f'Error parsing command with bashlex: {e}')
# Fallback: simple split by pipe
# This is a simple heuristic and not as accurate as bashlex parsing
if '|' in command:
return [part.strip() for part in command.split('|') if part.strip()]
else:
return [command.strip()] if command.strip() else []
class CommandApprovalAnalyzer(SecurityAnalyzer):
"""Security analyzer that automatically approves commands based on patterns and previously approved commands."""
def __init__(
self,
event_stream: EventStream,
policy: str | None = None,
sid: str | None = None,
) -> None:
"""Initializes a new instance of the CommandApprovalAnalyzer class."""
super().__init__(event_stream)
self.parser = CommandParser()
self.approved_commands: dict[
str, bool
] = {} # Dict of exact commands that have been approved
self.approved_patterns: list[
CommandPattern
] = [] # List of regex patterns for approved commands
self.compiled_patterns: dict[
str, re.Pattern
] = {} # Cache of compiled regex patterns
# Add some default patterns
self._add_default_patterns()
def _add_default_patterns(self) -> None:
"""Add default command patterns that are always approved."""
# Simple, safe commands
self.approved_patterns.append(
CommandPattern(
pattern=r'^ls(\s+-[a-zA-Z]+)*(\s+\S+)*$',
description='List directory contents',
)
)
self.approved_patterns.append(
CommandPattern(pattern=r'^cd(\s+\S+)?$', description='Change directory')
)
self.approved_patterns.append(
CommandPattern(pattern=r'^pwd$', description='Print working directory')
)
self.approved_patterns.append(
CommandPattern(pattern=r'^echo\s+.*$', description='Echo text')
)
def is_command_approved(self, command: str) -> bool:
"""Check if a command is approved and doesn't need confirmation.
Args:
command: The command to check.
Returns:
bool: True if the command is approved, False otherwise.
"""
if not command or not command.strip():
return False
# Check if this is a piped command
if self.parser.is_piped_command(command):
# For piped commands, all parts must be approved
sub_commands = self.parser.parse_command(command)
return all(self._is_single_command_approved(cmd) for cmd in sub_commands)
else:
# For single commands, just check directly
return self._is_single_command_approved(command)
def _is_single_command_approved(self, command: str) -> bool:
"""Check if a single (non-piped) command is approved.
Args:
command: The command to check.
Returns:
bool: True if the command is approved, False otherwise.
"""
command = command.strip()
# Check exact matches first
if command in self.approved_commands:
return self.approved_commands[command]
# Then check patterns from CommandPattern objects
for pattern in self.approved_patterns:
if pattern.matches(command):
return True
# Then check string patterns from the config
from openhands.core.config import load_openhands_config
try:
config = load_openhands_config()
if hasattr(config, 'security') and hasattr(
config.security, 'approved_command_patterns'
):
for pattern_str in config.security.approved_command_patterns:
# Compile the pattern if not already compiled
if pattern_str not in self.compiled_patterns:
try:
self.compiled_patterns[pattern_str] = re.compile(
pattern_str
)
except re.error:
# Skip invalid patterns
continue
# Check if the command matches the pattern
if self.compiled_patterns[pattern_str].match(command):
return True
except Exception:
# If there's any error loading the config, just continue without checking patterns
pass
return False
def approve_command(self, command: str) -> None:
"""Add a command to the approved commands list.
Args:
command: The command to approve.
"""
self.approved_commands[command] = True
# In a real implementation, we would save this to config.toml
logger.info(f"Command '{command}' approved for future use")
async def handle_api_request(self, request: Request) -> Any:
"""Handles the incoming API request."""
# This analyzer doesn't need to handle API requests
return {'message': "Command approval analyzer doesn't support API requests"}
async def security_risk(self, event: Action) -> ActionSecurityRisk:
"""Evaluates the Action for security risks and returns the risk level.
For command approval analyzer, we always return LOW risk level,
but we set the confirmation_state based on whether the command is approved.
"""
# Only process CmdRunAction and IPythonRunCellAction
if isinstance(event, CmdRunAction):
command = event.command
if self.is_command_approved(command):
event.confirmation_state = ActionConfirmationStatus.CONFIRMED
logger.info(f'Command automatically approved: {command}')
elif isinstance(event, IPythonRunCellAction):
code = event.code
if self.is_command_approved(code):
event.confirmation_state = ActionConfirmationStatus.CONFIRMED
logger.info(f'Python code automatically approved: {code}')
# Always return LOW risk level - we're not evaluating risk, just auto-approving
return ActionSecurityRisk.LOW
async def act(self, event: Event) -> None:
"""Performs an action based on the analyzed event.
This analyzer doesn't need to perform any actions since command approval
is handled directly in the CLI interface.
"""
pass
-2
View File
@@ -1,8 +1,6 @@
from openhands.security.analyzer import SecurityAnalyzer
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
from openhands.security.invariant.analyzer import InvariantAnalyzer
SecurityAnalyzers: dict[str, type[SecurityAnalyzer]] = {
'invariant': InvariantAnalyzer,
'command_approval': CommandApprovalAnalyzer,
}
-78
View File
@@ -1,78 +0,0 @@
"""
LiteLLM currently have an issue where HttpHandlers are being created but not
closed. We have submitted a PR to them, (https://github.com/BerriAI/litellm/pull/8711)
and their dev team say they are in the process of a refactor that will fix this, but
in the meantime, we need to manage the lifecycle of the httpx.Client manually.
We can't simply pass in our own client object, because all the different implementations use
different types of client object.
So we monkey patch the httpx.Client class to track newly created instances and close these
when the operations complete. (Since some paths create a single shared client and reuse these,
we actually need to create a proxy object that allows these clients to be reusable.)
Hopefully, this will be fixed soon and we can remove this abomination.
"""
import contextlib
from typing import Callable
import httpx
@contextlib.contextmanager
def ensure_httpx_close():
wrapped_class = httpx.Client
proxys = []
class ClientProxy:
"""
Sometimes LiteLLM opens a new httpx client for each connection, and does not close them.
Sometimes it does close them. Sometimes, it reuses a client between connections. For cases
where a client is reused, we need to be able to reuse the client even after closing it.
"""
client_constructor: Callable
args: tuple
kwargs: dict
client: httpx.Client
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.client = wrapped_class(*self.args, **self.kwargs)
proxys.append(self)
def __getattr__(self, name):
# Invoke a method on the proxied client - create one if required
if self.client is None:
self.client = wrapped_class(*self.args, **self.kwargs)
return getattr(self.client, name)
def close(self):
# Close the client if it is open
if self.client:
self.client.close()
self.client = None
def __iter__(self, *args, **kwargs):
# We have to override this as debuggers invoke it causing the client to reopen
if self.client:
return self.client.iter(*args, **kwargs)
return object.__getattribute__(self, 'iter')(*args, **kwargs)
@property
def is_closed(self):
# Check if closed
if self.client is None:
return True
return self.client.is_closed
httpx.Client = ClientProxy
try:
yield
finally:
httpx.Client = wrapped_class
while proxys:
proxy = proxys.pop()
proxy.close()
+3 -1
View File
@@ -4,6 +4,7 @@ from itertools import islice
from jinja2 import Template
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
from openhands.controller.state.state import State
from openhands.core.message import Message, TextContent
from openhands.events.observation.agent import MicroagentKnowledge
@@ -91,7 +92,8 @@ class PromptManager:
return Template(file.read())
def get_system_message(self) -> str:
return self.system_template.render().strip()
system_message = self.system_template.render().strip()
return refine_prompt(system_message)
def get_example_user_message(self) -> str:
"""This is an initial user message that can be provided to the agent
-373
View File
@@ -1,373 +0,0 @@
"""Tests for command pattern generation and parsing."""
import re
from unittest.mock import MagicMock, patch
import pytest
from openhands.cli.tui import (
_generate_command_patterns,
_generate_single_command_pattern,
_handle_pattern_selection,
_parse_piped_command,
read_confirmation_input,
)
class TestGenerateSingleCommandPattern:
"""Test the _generate_single_command_pattern function."""
def test_empty_command(self):
"""Test pattern generation for empty command."""
patterns = _generate_single_command_pattern('')
assert len(patterns) == 1
assert patterns[0] == '^$'
def test_single_word_command(self):
"""Test pattern generation for single word command."""
patterns = _generate_single_command_pattern('ls')
assert len(patterns) == 1
assert patterns[0] == '^ls(\\s.*|$)'
def test_two_word_command(self):
"""Test pattern generation for two word command."""
patterns = _generate_single_command_pattern('ls -la')
assert len(patterns) == 2
assert patterns[0] == '^ls(\\s.*|$)'
assert patterns[1] == '^ls\\s+\\-la.*$'
def test_three_word_command(self):
"""Test pattern generation for three word command."""
patterns = _generate_single_command_pattern('git commit -m')
assert len(patterns) == 3
assert patterns[0] == '^git(\\s.*|$)'
assert patterns[1] == '^git\\s+commit.*$'
assert patterns[2] == '^git\\s+commit\\s+\\-m.*$'
def test_four_word_command(self):
"""Test pattern generation for four word command (should only generate 3 patterns)."""
patterns = _generate_single_command_pattern('git commit -m message')
assert len(patterns) == 3 # Only first 3 prefixes
assert patterns[0] == '^git(\\s.*|$)'
assert patterns[1] == '^git\\s+commit.*$'
assert patterns[2] == '^git\\s+commit\\s+\\-m.*$'
def test_pattern_matching(self):
"""Test that generated patterns actually match similar commands."""
patterns = _generate_single_command_pattern('ls -la')
# Test first pattern (ls.*)
pattern1 = re.compile(patterns[0])
assert pattern1.match('ls')
assert pattern1.match('ls -la')
assert pattern1.match('ls -alh /home')
assert not pattern1.match('cat file.txt')
# Test second pattern (ls -la.*)
pattern2 = re.compile(patterns[1])
assert pattern2.match('ls -la')
assert pattern2.match('ls -la /home')
assert not pattern2.match('ls')
assert not pattern2.match('ls -alh')
def test_special_characters_escaped(self):
"""Test that special regex characters are properly escaped."""
patterns = _generate_single_command_pattern('echo $HOME')
assert len(patterns) == 2
assert patterns[0] == '^echo(\\s.*|$)'
assert patterns[1] == '^echo\\s+\\$HOME.*$'
# Test that the pattern works
pattern = re.compile(patterns[1])
assert pattern.match('echo $HOME')
assert pattern.match('echo $HOME/test')
class TestParsePipedCommand:
"""Test the _parse_piped_command function."""
def test_empty_command(self):
"""Test parsing empty command."""
result = _parse_piped_command('')
assert result == []
def test_whitespace_only_command(self):
"""Test parsing whitespace-only command."""
result = _parse_piped_command(' ')
assert result == []
def test_single_command(self):
"""Test parsing single command without pipes."""
result = _parse_piped_command('ls -la')
assert result == ['ls -la']
def test_simple_piped_command(self):
"""Test parsing simple piped command."""
result = _parse_piped_command('ls -la | grep test')
assert result == ['ls -la', 'grep test']
def test_three_command_pipe(self):
"""Test parsing three-command pipe."""
result = _parse_piped_command('cat file.txt | grep pattern | wc -l')
assert result == ['cat file.txt', 'grep pattern', 'wc -l']
def test_pipe_with_quotes(self):
"""Test parsing piped command with quoted arguments."""
result = _parse_piped_command('echo "hello world" | grep "hello"')
# shlex removes quotes, so we get the unquoted content
assert result == ['echo hello world', 'grep hello']
def test_pipe_without_spaces(self):
"""Test parsing piped command without spaces around pipes."""
result = _parse_piped_command('ls|grep test')
assert result == ['ls', 'grep test']
def test_complex_command_with_options(self):
"""Test parsing complex command with various options."""
result = _parse_piped_command(
"find /home -name '*.py' | xargs grep -l 'import os'"
)
# shlex removes quotes, so we get the unquoted content
assert result == ['find /home -name *.py', 'xargs grep -l import os']
def test_invalid_quotes(self):
"""Test parsing command with invalid quotes falls back gracefully."""
result = _parse_piped_command('echo "unclosed quote | grep test')
assert result == ['echo "unclosed quote', 'grep test']
class TestGenerateCommandPatterns:
"""Test the _generate_command_patterns function."""
def test_single_command(self):
"""Test pattern generation for single command."""
patterns = _generate_command_patterns('ls -la')
assert len(patterns) == 2
assert patterns[0] == '^ls(\\s.*|$)'
assert patterns[1] == '^ls\\s+\\-la.*$'
def test_piped_command(self):
"""Test pattern generation for piped command."""
patterns = _generate_command_patterns('ls -la | grep test')
assert len(patterns) == 1
# Should combine the first pattern from each sub-command
assert patterns[0] == '^ls(\\s.*|$)\\s*\\|\\s*grep(\\s.*|$)$'
def test_empty_command(self):
"""Test pattern generation for empty command."""
patterns = _generate_command_patterns('')
assert len(patterns) == 1
assert patterns[0] == '^$'
class TestReadConfirmationInput:
"""Test the read_confirmation_input function."""
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_yes_option(self, mock_confirm):
"""Test selecting 'yes' option."""
mock_confirm.return_value = 0 # First option (Yes, proceed)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'yes'
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_no_option(self, mock_confirm):
"""Test selecting 'no' option."""
mock_confirm.return_value = 1 # Second option (No)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'no'
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_always_option(self, mock_confirm):
"""Test selecting 'always' option."""
mock_confirm.return_value = 2 # Third option (Always proceed)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'always'
@pytest.mark.asyncio
@patch('openhands.cli.tui._handle_pattern_selection')
@patch('openhands.cli.tui.cli_confirm')
async def test_remember_option(self, mock_confirm, mock_pattern_selection):
"""Test selecting 'remember' option."""
mock_confirm.return_value = 3 # Fourth option (Remember)
mock_pattern_selection.return_value = (
'remember:^ls.*$:Commands starting with: ls'
)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'remember:^ls.*$:Commands starting with: ls'
mock_pattern_selection.assert_called_once_with(config, 'ls -la')
class TestHandlePatternSelection:
"""Test the _handle_pattern_selection function."""
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_exact_command_selection(self, mock_confirm):
"""Test selecting exact command pattern."""
mock_confirm.return_value = 0 # First option (exact command)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result.startswith('remember:^ls\\ \\-la$:Exact command: ls -la')
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_prefix_pattern_selection(self, mock_confirm):
"""Test selecting prefix pattern."""
mock_confirm.return_value = 1 # Second option (first prefix pattern)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result.startswith('remember:^ls(\\s.*|$):Commands starting with: ls')
@pytest.mark.asyncio
@patch('openhands.cli.tui.create_prompt_session')
@patch('openhands.cli.tui.cli_confirm')
async def test_custom_pattern_selection_valid(
self, mock_confirm, mock_create_session
):
"""Test selecting custom pattern with valid regex."""
mock_confirm.return_value = (
3 # Custom pattern option (assuming 3 total options)
)
# Mock the prompt session
mock_session = MagicMock()
# Create a proper async mock
async def mock_prompt_async(prompt):
return '^git.*$'
mock_session.prompt_async = mock_prompt_async
mock_create_session.return_value = mock_session
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result == 'remember:^git.*$:Custom pattern: ^git.*$'
@pytest.mark.asyncio
@patch('openhands.cli.tui.create_prompt_session')
@patch('openhands.cli.tui.cli_confirm')
@patch('openhands.cli.tui.print_formatted_text')
async def test_custom_pattern_selection_invalid(
self, mock_print, mock_confirm, mock_create_session
):
"""Test selecting custom pattern with invalid regex."""
mock_confirm.return_value = 3 # Custom pattern option
# Mock the prompt session to return invalid regex
mock_session = MagicMock()
# Create a proper async mock
async def mock_prompt_async(prompt):
return '[invalid regex'
mock_session.prompt_async = mock_prompt_async
mock_create_session.return_value = mock_session
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
# Should fall back to exact command
assert result.startswith('remember:^ls\\ \\-la$:Exact command: ls -la')
# Should print error message
mock_print.assert_called()
class TestPatternMatching:
"""Test that the generated patterns work correctly for matching commands."""
def test_ls_patterns(self):
"""Test patterns generated for ls command."""
patterns = _generate_single_command_pattern('ls -alh')
# Test first pattern (ls.*)
pattern1 = re.compile(patterns[0])
assert pattern1.match('ls')
assert pattern1.match('ls -la')
assert pattern1.match('ls -alh /home')
assert pattern1.match('ls --help')
assert not pattern1.match('cat file.txt')
assert not pattern1.match('lsof') # Should not match partial word
# Test second pattern (ls -alh.*)
pattern2 = re.compile(patterns[1])
assert pattern2.match('ls -alh')
assert pattern2.match('ls -alh /home')
assert not pattern2.match('ls')
assert not pattern2.match('ls -la')
def test_git_patterns(self):
"""Test patterns generated for git command."""
patterns = _generate_single_command_pattern('git commit -m')
# Test git(\s.*|$)
pattern1 = re.compile(patterns[0])
assert pattern1.match('git status')
assert pattern1.match('git commit')
assert pattern1.match('git push origin main')
assert not pattern1.match('github') # Should not match partial word
# Test git commit.*
pattern2 = re.compile(patterns[1])
assert pattern2.match('git commit')
assert pattern2.match("git commit -m 'message'")
assert pattern2.match('git commit --amend')
assert not pattern2.match('git status')
assert not pattern2.match('git push')
def test_piped_command_patterns(self):
"""Test patterns generated for piped commands."""
patterns = _generate_command_patterns('cat file.txt | grep pattern')
pattern = re.compile(patterns[0])
assert pattern.match('cat file.txt | grep pattern')
assert pattern.match('cat another.txt | grep something')
assert pattern.match('cat /path/to/file | grep test')
assert not pattern.match('cat file.txt')
assert not pattern.match('grep pattern')
def test_complex_command_patterns(self):
"""Test patterns for complex commands with special characters."""
patterns = _generate_single_command_pattern("find /home -name '*.py'")
# Test find(\s.*|$)
pattern1 = re.compile(patterns[0])
assert pattern1.match("find /home -name '*.py'")
assert pattern1.match('find . -type f')
assert pattern1.match('find /usr/bin -executable')
assert not pattern1.match('finder') # Should not match partial word
# Test find /home.*
pattern2 = re.compile(patterns[1])
assert pattern2.match("find /home -name '*.py'")
assert pattern2.match('find /home -type d')
assert not pattern2.match("find . -name '*.py'")
assert not pattern2.match("find /usr -name '*.py'")
-69
View File
@@ -1,69 +0,0 @@
import httpx
from openhands.utils.ensure_httpx_close import ensure_httpx_close
def test_ensure_httpx_close_basic():
"""Test basic functionality of ensure_httpx_close."""
ctx = ensure_httpx_close()
with ctx:
# Create a client - should be tracked
client = httpx.Client()
# After context exit, client should be closed
assert client.is_closed
def test_ensure_httpx_close_multiple_clients():
"""Test ensure_httpx_close with multiple clients."""
ctx = ensure_httpx_close()
with ctx:
client1 = httpx.Client()
client2 = httpx.Client()
assert client1.is_closed
assert client2.is_closed
def test_ensure_httpx_close_nested():
"""Test nested usage of ensure_httpx_close."""
with ensure_httpx_close():
client1 = httpx.Client()
with ensure_httpx_close():
client2 = httpx.Client()
assert not client2.is_closed
# After inner context, client2 should be closed
assert client2.is_closed
# client1 should still be open since outer context is still active
assert not client1.is_closed
# After outer context, both clients should be closed
assert client1.is_closed
assert client2.is_closed
def test_ensure_httpx_close_exception():
"""Test ensure_httpx_close when an exception occurs."""
client = None
try:
with ensure_httpx_close():
client = httpx.Client()
raise ValueError('Test exception')
except ValueError:
pass
# Client should be closed even if an exception occurred
assert client is not None
assert client.is_closed
def test_ensure_httpx_close_restore_client():
"""Test that the original client is restored after context exit."""
original_client = httpx.Client
with ensure_httpx_close():
assert httpx.Client != original_client
# Original __init__ should be restored
assert httpx.Client == original_client
@@ -1,48 +0,0 @@
"""Tests for the security config pipe parsing functionality."""
from openhands.security.command_approval.analyzer import CommandParser
class TestBashlexParsing:
"""Test bashlex parsing functionality."""
def test_bashlex_pipe_detection(self):
"""Test detection of piped commands using bashlex."""
parser = CommandParser()
# Commands with pipes
assert parser.is_piped_command('ls -la | grep .py')
assert parser.is_piped_command('cat file.txt | head -10 | tail -5')
assert parser.is_piped_command("find . -name '*.py' | xargs grep 'import'")
# Commands without pipes
assert not parser.is_piped_command('ls -la')
assert not parser.is_piped_command("echo 'hello world'")
assert not parser.is_piped_command('ls -la > output.txt')
# Edge cases
assert not parser.is_piped_command('')
# Pipe in quotes is not a real pipe
assert not parser.is_piped_command("echo 'hello | world'")
def test_bashlex_command_extraction(self):
"""Test extraction of commands from pipelines using bashlex."""
parser = CommandParser()
# Simple command
assert parser.parse_command('ls -la') == ['ls -la']
# Piped commands
assert parser.parse_command('ls -la | grep .py') == ['ls -la', 'grep .py']
assert parser.parse_command('cat file.txt | head -10 | tail -5') == [
'cat file.txt',
'head -10',
'tail -5',
]
# Commands with redirections
assert parser.parse_command('ls -la > output.txt') == ['ls -la']
# Edge cases
assert parser.parse_command('') == []
assert parser.parse_command("echo 'hello | world'") == ['echo hello | world']
@@ -0,0 +1,179 @@
import sys
from unittest.mock import patch
import pytest
from openhands.agenthub.codeact_agent.codeact_agent import CodeActAgent
from openhands.core.config import AgentConfig
from openhands.llm.llm import LLM
# Skip all tests in this module if not running on Windows
pytestmark = pytest.mark.skipif(
sys.platform != 'win32', reason='Windows prompt refinement tests require Windows'
)
@pytest.fixture
def mock_llm():
"""Create a mock LLM for testing."""
llm = LLM(config={'model': 'gpt-4', 'api_key': 'test'})
return llm
@pytest.fixture
def agent_config():
"""Create a basic agent config for testing."""
return AgentConfig()
def test_codeact_agent_system_prompt_no_bash_on_windows(mock_llm, agent_config):
"""Test that CodeActAgent's system prompt doesn't contain 'bash' on Windows."""
# Create a CodeActAgent instance
agent = CodeActAgent(llm=mock_llm, config=agent_config)
# Get the system prompt
system_prompt = agent.prompt_manager.get_system_message()
# Assert that 'bash' doesn't exist in the system prompt (case-insensitive)
assert 'bash' not in system_prompt.lower(), (
f"System prompt contains 'bash' on Windows platform. "
f"It should be replaced with 'powershell'. "
f'System prompt: {system_prompt}'
)
# Verify that 'powershell' exists instead (case-insensitive)
assert 'powershell' in system_prompt.lower(), (
f"System prompt should contain 'powershell' on Windows platform. "
f'System prompt: {system_prompt}'
)
def test_codeact_agent_tool_descriptions_no_bash_on_windows(mock_llm, agent_config):
"""Test that CodeActAgent's tool descriptions don't contain 'bash' on Windows."""
# Create a CodeActAgent instance
agent = CodeActAgent(llm=mock_llm, config=agent_config)
# Get the tools
tools = agent.tools
# Check each tool's description and parameters
for tool in tools:
if tool['type'] == 'function':
function_info = tool['function']
# Check function description
description = function_info.get('description', '')
assert 'bash' not in description.lower(), (
f"Tool '{function_info['name']}' description contains 'bash' on Windows. "
f'Description: {description}'
)
# Check parameter descriptions
parameters = function_info.get('parameters', {})
properties = parameters.get('properties', {})
for param_name, param_info in properties.items():
param_description = param_info.get('description', '')
assert 'bash' not in param_description.lower(), (
f"Tool '{function_info['name']}' parameter '{param_name}' "
f"description contains 'bash' on Windows. "
f'Parameter description: {param_description}'
)
def test_in_context_learning_example_no_bash_on_windows():
"""Test that in-context learning examples don't contain 'bash' on Windows."""
from openhands.agenthub.codeact_agent.tools.bash import create_cmd_run_tool
from openhands.agenthub.codeact_agent.tools.finish import FinishTool
from openhands.agenthub.codeact_agent.tools.str_replace_editor import (
create_str_replace_editor_tool,
)
from openhands.llm.fn_call_converter import get_example_for_tools
# Create a sample set of tools
tools = [
create_cmd_run_tool(),
create_str_replace_editor_tool(),
FinishTool,
]
# Get the in-context learning example
example = get_example_for_tools(tools)
# Assert that 'bash' doesn't exist in the example (case-insensitive)
assert 'bash' not in example.lower(), (
f"In-context learning example contains 'bash' on Windows platform. "
f"It should be replaced with 'powershell'. "
f'Example: {example}'
)
# Verify that 'powershell' exists instead (case-insensitive)
if example: # Only check if example is not empty
assert 'powershell' in example.lower(), (
f"In-context learning example should contain 'powershell' on Windows platform. "
f'Example: {example}'
)
def test_refine_prompt_function_works():
"""Test that the refine_prompt function correctly replaces 'bash' with 'powershell'."""
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
# Test basic replacement
test_prompt = 'Execute a bash command to list files'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert 'powershell' in refined_prompt.lower()
assert refined_prompt == 'Execute a powershell command to list files'
# Test multiple occurrences
test_prompt = 'Use bash to run bash commands in the bash shell'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert (
refined_prompt
== 'Use powershell to run powershell commands in the powershell shell'
)
# Test case sensitivity
test_prompt = 'BASH and Bash and bash should all be replaced'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert (
refined_prompt
== 'powershell and powershell and powershell should all be replaced'
)
# Test execute_bash tool name replacement
test_prompt = 'Use the execute_bash tool to run commands'
refined_prompt = refine_prompt(test_prompt)
assert 'execute_bash' not in refined_prompt.lower()
assert 'execute_powershell' in refined_prompt.lower()
assert refined_prompt == 'Use the execute_powershell tool to run commands'
# Test that words containing 'bash' but not equal to 'bash' are preserved
test_prompt = 'The bashful person likes bash-like syntax'
refined_prompt = refine_prompt(test_prompt)
# 'bashful' should be preserved, 'bash-like' should become 'powershell-like'
assert 'bashful' in refined_prompt
assert 'powershell-like' in refined_prompt
assert refined_prompt == 'The bashful person likes powershell-like syntax'
def test_refine_prompt_function_on_non_windows():
"""Test that the refine_prompt function doesn't change anything on non-Windows platforms."""
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
# Mock sys.platform to simulate non-Windows
with patch('openhands.agenthub.codeact_agent.tools.bash.sys.platform', 'linux'):
test_prompt = 'Execute a bash command to list files'
refined_prompt = refine_prompt(test_prompt)
# On non-Windows, the prompt should remain unchanged
assert refined_prompt == test_prompt
assert 'bash' in refined_prompt.lower()