Compare commits

..

12 Commits

Author SHA1 Message Date
Xingyao Wang d89595a9cf Merge commit '116ba199d1c0d35b87af59254d1249c4fdd1fde5' into improve-cli-colors 2025-08-10 11:38:58 -04:00
Xingyao Wang 116ba199d1 feat(agent): stop using short tool description for gpt-5 (#10184) 2025-08-09 17:56:52 -04:00
Boxuan Li 803bdced9c Fix Windows prompt refinement: ensure 'bash' is replaced with 'powershell' in all prompts (#10179)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 20:28:36 -07:00
Xingyao Wang 3eecac2003 docs: Add GPT-5 model recommendation and fix pricing display issue (#10177)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 19:19:59 +00:00
mamoodi c02e09fc2d Hide Git Settings section from Application settings (#10176)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-08-08 19:06:40 +00:00
openhands 53872a4d55 Fix test_message_action_from_agent to use display_agent_message instead of display_message 2025-07-30 18:08:12 +00:00
openhands f56314bda6 Fix poetry.lock and linting issues 2025-07-30 16:43:32 +00:00
openhands 166d7a4d1a Fix TypeScript errors and mypy errors in CLI colors PR 2025-07-30 16:36:15 +00:00
openhands db478cbc7e Fix markdown rendering in CLI and frontend linting issues 2025-07-30 16:12:34 +00:00
openhands a86a0e7792 Merge main into improve-cli-colors branch 2025-07-30 15:33:18 +00:00
openhands 9dfc85f4e3 Fix tests for new CLI colors feature 2025-07-19 16:06:57 +00:00
openhands e9c844087c Improve CLI colors for agent finish and message actions
- Add distinctive colors for AgentFinishAction with success/partial/failed status indicators
- Add soft blue styling for agent MessageAction to distinguish from regular output
- Import AgentFinishAction and create dedicated display functions
- Use bright green for finish actions and soft blue for agent messages
- Add visual status indicators (, ⚠️, ) and emoji titles for better UX
- Maintain backward compatibility with existing CLI functionality
2025-07-17 19:16:12 +00:00
31 changed files with 483 additions and 1196 deletions
+3 -18
View File
@@ -360,28 +360,13 @@ classpath = "my_package.my_module.MyCustomAgent"
[security]
# Enable confirmation mode (For Headless / CLI only - In Web this is overridden by Session Init)
# When using command_approval analyzer, this should be enabled
confirmation_mode = true
#confirmation_mode = false
# The security analyzer to use (For Headless / CLI only - In Web this is overridden by Session Init)
# Available options: "invariant", "command_approval"
# For CLI with confirmation mode, "command_approval" is recommended
security_analyzer = "command_approval"
#security_analyzer = ""
# Whether to enable security analyzer
# When using command_approval analyzer, this should be enabled
enable_security_analyzer = true
# Dictionary of approved commands that don't require confirmation
# The key is the command, and the value is a boolean (true to approve)
#approved_commands = { "ls -la" = true, "git status" = true }
# List of approved command patterns (regex) that don't require confirmation
#approved_command_patterns = [
# { pattern = "^ls( -[a-zA-Z]+)?( \\S+)?$", description = "List directory contents" },
# { pattern = "^cd \\S+$", description = "Change directory" },
# { pattern = "^git (status|log|diff)$", description = "Basic git commands" }
#]
#enable_security_analyzer = false
#################################### Condenser #################################
# Condensers control how conversation history is managed and compressed when
+1 -1
View File
@@ -40,7 +40,7 @@ repos:
hooks:
- id: mypy
additional_dependencies:
[types-requests, types-setuptools, types-pyyaml, types-toml, types-docker, pydantic, lxml]
[types-requests, types-setuptools, types-pyyaml, types-toml, types-docker, pydantic, lxml, types-Markdown]
# To see gaps add `--html-report mypy-report/`
entry: mypy --config-file dev_config/python/mypy.ini openhands/
always_run: true
+1 -1
View File
@@ -18,7 +18,7 @@ Based on these findings and community feedback, these are the latest models that
### Cloud / API-Based Models
- [anthropic/claude-sonnet-4-20250514](https://www.anthropic.com/api) (recommended)
- [openai/o4-mini](https://openai.com/index/introducing-o3-and-o4-mini/)
- [openai/gpt-5-2025-08-07](https://openai.com/api/) (recommended)
- [gemini/gemini-2.5-pro](https://blog.google/technology/google-deepmind/gemini-model-thinking-updates-march-2025/)
- [deepseek/deepseek-chat](https://api-docs.deepseek.com/)
- [moonshot/kimi-k2-0711-preview](https://platform.moonshot.ai/docs/pricing/chat#generation-model-kimi-k2)
+1 -1
View File
@@ -32,4 +32,4 @@ When running OpenHands, you'll need to set the following in the OpenHands UI thr
Pricing follows official API provider rates. [You can view model prices here.](https://github.com/BerriAI/litellm/blob/main/model_prices_and_context_window.json)
For `qwen3-coder-480b`, we charge the cheapest FP8 rate available on openrouter: $0.4 per million input tokens and $1.6 per million output tokens.
For `qwen3-coder-480b`, we charge the cheapest FP8 rate available on openrouter: \$0.4 per million input tokens and \$1.6 per million output tokens.
+54
View File
@@ -6072,6 +6072,60 @@
"node": ">=14.0.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/@emnapi/core": {
"version": "1.4.3",
"inBundle": true,
"license": "MIT",
"optional": true,
"dependencies": {
"@emnapi/wasi-threads": "1.0.2",
"tslib": "^2.4.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/@emnapi/runtime": {
"version": "1.4.3",
"inBundle": true,
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/@emnapi/wasi-threads": {
"version": "1.0.2",
"inBundle": true,
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/@napi-rs/wasm-runtime": {
"version": "0.2.11",
"inBundle": true,
"license": "MIT",
"optional": true,
"dependencies": {
"@emnapi/core": "^1.4.3",
"@emnapi/runtime": "^1.4.3",
"@tybys/wasm-util": "^0.9.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/@tybys/wasm-util": {
"version": "0.9.0",
"inBundle": true,
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@tailwindcss/oxide-wasm32-wasi/node_modules/tslib": {
"version": "2.8.0",
"inBundle": true,
"license": "0BSD",
"optional": true
},
"node_modules/@tailwindcss/oxide-win32-arm64-msvc": {
"version": "4.1.11",
"resolved": "https://registry.npmjs.org/@tailwindcss/oxide-win32-arm64-msvc/-/oxide-win32-arm64-msvc-4.1.11.tgz",
@@ -14,6 +14,7 @@ import {
isStatusUpdate,
} from "#/types/core/guards";
import { AgentState } from "#/types/agent-state";
import EventLogger from "#/utils/event-logger";
import {
renderConversationErroredToast,
renderConversationCreatedToast,
+1 -1
View File
@@ -222,7 +222,7 @@ function AppSettingsScreen() {
className="w-full max-w-[680px]" // Match the width of the language field
/>
<div className="border-t border-t-tertiary pt-6 mt-2">
<div className="border-t border-t-tertiary pt-6 mt-2 hidden">
<h3 className="text-lg font-medium mb-4">
{t(I18nKey.SETTINGS$GIT_SETTINGS)}
</h3>
@@ -106,10 +106,15 @@ class CodeActAgent(Agent):
def _get_tools(self) -> list['ChatCompletionToolParam']:
# For these models, we use short tool descriptions ( < 1024 tokens)
# to avoid hitting the OpenAI token limit for tool descriptions.
SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS = ['gpt-', 'o3', 'o1', 'o4']
SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS = ['gpt-4', 'o3', 'o1', 'o4']
use_short_tool_desc = False
if self.llm is not None:
# For historical reasons, previously OpenAI enforces max function description length of 1k characters
# https://community.openai.com/t/function-call-description-max-length/529902
# But it no longer seems to be an issue recently
# https://community.openai.com/t/was-the-character-limit-for-schema-descriptions-upgraded/1225975
# Tested on GPT-5 and longer description still works. But we still keep the logic to be safe for older models.
use_short_tool_desc = any(
model_substr in self.llm.config.model
for model_substr in SHORT_TOOL_DESCRIPTION_LLM_SUBSTRS
+11 -1
View File
@@ -1,3 +1,4 @@
import re
import sys
from litellm import ChatCompletionToolParam, ChatCompletionToolParamFunctionChunk
@@ -37,7 +38,16 @@ _SHORT_BASH_DESCRIPTION = """Execute a bash command in the terminal.
def refine_prompt(prompt: str):
if sys.platform == 'win32':
return prompt.replace('bash', 'powershell')
# Replace 'bash' with 'powershell' including tool names like 'execute_bash'
# First replace 'execute_bash' with 'execute_powershell' to handle tool names
result = re.sub(
r'\bexecute_bash\b', 'execute_powershell', prompt, flags=re.IGNORECASE
)
# Then replace standalone 'bash' with 'powershell'
result = re.sub(
r'(?<!execute_)(?<!_)\bbash\b', 'powershell', result, flags=re.IGNORECASE
)
return result
return prompt
+2 -37
View File
@@ -236,43 +236,8 @@ async def run_session(
)
return
# Get the pending action from the agent controller
pending_action = controller._pending_action
command = ''
if pending_action:
if hasattr(pending_action, 'command'):
command = pending_action.command
elif hasattr(pending_action, 'code'):
command = pending_action.code
confirmation_status = await read_confirmation_input(
config, command, pending_action
)
# Handle different confirmation responses
if confirmation_status == 'always':
# Set always confirm mode to skip future confirmations
always_confirm_mode = True
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
)
elif confirmation_status.startswith('remember:'):
# Parse the remember response: remember:pattern:description
parts = confirmation_status.split(':', 2)
if len(parts) == 3:
_, pattern, description = parts
# Save the command pattern to config
from openhands.cli.utils import save_approved_command_to_config
save_approved_command_to_config(
command, pattern=pattern, description=description
)
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
)
elif confirmation_status == 'yes':
confirmation_status = await read_confirmation_input(config)
if confirmation_status in ('yes', 'always'):
event_stream.add_event(
ChangeAgentStateAction(AgentState.USER_CONFIRMED),
EventSource.USER,
+110 -216
View File
@@ -6,6 +6,7 @@ import asyncio
import contextlib
import datetime
import json
import re
import sys
import threading
import time
@@ -36,6 +37,7 @@ from openhands.events import EventSource, EventStream
from openhands.events.action import (
Action,
ActionConfirmationStatus,
AgentFinishAction,
ChangeAgentStateAction,
CmdRunAction,
MCPAction,
@@ -65,10 +67,16 @@ MAX_RECENT_THOUGHTS = 5
# Color and styling constants
COLOR_GOLD = '#FFD700'
COLOR_GREY = '#808080'
COLOR_SUCCESS_GREEN = '#00D787' # Bright green for finish actions
COLOR_AGENT_BLUE = '#5FAFFF' # Soft blue for agent messages
COLOR_FINISH_FRAME = '#00AF87' # Darker green for finish action frames
DEFAULT_STYLE = Style.from_dict(
{
'gold': COLOR_GOLD,
'grey': COLOR_GREY,
'success-green': COLOR_SUCCESS_GREEN,
'agent-blue': COLOR_AGENT_BLUE,
'finish-frame': COLOR_FINISH_FRAME,
'prompt': f'{COLOR_GOLD} bold',
}
)
@@ -252,7 +260,10 @@ def display_thought_if_new(thought: str) -> None:
def display_event(event: Event, config: OpenHandsConfig) -> None:
global streaming_output_text_area
with print_lock:
if isinstance(event, CmdRunAction):
if isinstance(event, AgentFinishAction):
# Handle agent finish actions with special styling
display_agent_finish(event)
elif isinstance(event, CmdRunAction):
# For CmdRunAction, display thought first, then command
if hasattr(event, 'thought') and event.thought:
display_message(event.thought)
@@ -275,8 +286,8 @@ def display_event(event: Event, config: OpenHandsConfig) -> None:
if isinstance(event, MessageAction):
if event.source == EventSource.AGENT:
# Check if this message content is a duplicate thought
display_thought_if_new(event.content)
# Display agent messages with distinctive styling
display_agent_message(event.content)
elif isinstance(event, CmdOutputObservation):
display_command_output(event.content)
elif isinstance(event, FileEditObservation):
@@ -291,6 +302,29 @@ def display_event(event: Event, config: OpenHandsConfig) -> None:
display_error(event.content)
def process_markdown_for_terminal(text: str) -> str:
"""
Process markdown syntax for terminal display.
This function handles common markdown patterns like bold, italic, code blocks, etc.
"""
if not text:
return text
# Process bold text (**text**)
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)
# Process italic text (*text*)
text = re.sub(r'\*(.*?)\*', r'\1', text)
# Process inline code (`code`)
text = re.sub(r'`(.*?)`', r'\1', text)
# Process code blocks
text = re.sub(r'```(?:\w+)?\n(.*?)\n```', r'\1', text, flags=re.DOTALL)
return text
def display_message(message: str) -> None:
message = message.strip()
@@ -298,6 +332,76 @@ def display_message(message: str) -> None:
print_formatted_text(f'\n{message}')
def display_agent_message(message: str) -> None:
"""Display a message from the agent with distinctive styling and markdown rendering."""
message = message.strip()
if message:
# Process markdown in the message
try:
# Process markdown for terminal display
processed_message = process_markdown_for_terminal(message)
except Exception:
# If markdown processing fails, use the original message
processed_message = message
container = Frame(
TextArea(
text=processed_message,
read_only=True,
style=COLOR_AGENT_BLUE,
wrap_lines=True,
),
title='Agent Message',
style=f'fg:{COLOR_AGENT_BLUE}',
)
print_formatted_text('')
print_container(container)
def display_agent_finish(event: AgentFinishAction) -> None:
"""Display an agent finish action with distinctive styling and markdown rendering."""
# Determine the message to display
if event.final_thought:
message = event.final_thought
elif event.thought:
message = event.thought
else:
message = "All done! What's next on the agenda?"
# Add task completion status if available
if event.task_completed:
status_map = {
'true': '✅ Task completed successfully',
'partial': '⚠️ Task partially completed',
'false': '❌ Task could not be completed',
}
status_text = status_map.get(event.task_completed.value, '')
if status_text:
message = f'{status_text}\n\n{message}'
# Process markdown in the message
try:
# Process markdown for terminal display
processed_message = process_markdown_for_terminal(message)
except Exception:
# If markdown processing fails, use the original message
processed_message = message
container = Frame(
TextArea(
text=processed_message,
read_only=True,
style=COLOR_SUCCESS_GREEN,
wrap_lines=True,
),
title='🎯 Agent Finished',
style=f'fg:{COLOR_FINISH_FRAME}',
)
print_formatted_text('')
print_container(container)
def display_error(error: str) -> None:
error = error.strip()
@@ -700,140 +804,12 @@ async def read_prompt_input(
return '/exit'
def _generate_single_command_pattern(command: str) -> list[str]:
"""Generate candidate regex patterns for a single command based on prefixes.
Args:
command: The single command to generate patterns for.
Returns:
list[str]: List of candidate regex patterns that match similar commands.
"""
import re
# Split command into parts
parts = command.split()
if not parts:
return [f'^{re.escape(command)}$']
patterns = []
# Generate patterns for first word, first two words, first three words
for i in range(1, min(4, len(parts) + 1)):
prefix_parts = parts[:i]
escaped_prefix = '\\s+'.join(re.escape(part) for part in prefix_parts)
# Create pattern: prefix followed by word boundary and anything until end of line
# This prevents matching partial words (e.g., "ls" won't match "lsof")
if i == 1:
# For single word, add word boundary to prevent partial matches
pattern = f'^{escaped_prefix}(\\s.*|$)'
else:
# For multiple words, the space already acts as a boundary
pattern = f'^{escaped_prefix}.*$'
patterns.append(pattern)
return patterns
def _parse_piped_command(command: str) -> list[str]:
"""Parse a command that may contain pipes into individual commands.
Args:
command: The command string to parse.
Returns:
list[str]: List of individual commands split by pipes.
"""
import shlex
# Handle edge cases first
if not command or not command.strip():
return []
# Use shlex to split the command, which handles quotes and escapes properly
try:
# Split the entire command into tokens
tokens = shlex.split(command, posix=True)
except ValueError:
# If shlex fails (e.g., unmatched quotes), fall back to simple split
return [part.strip() for part in command.split('|') if part.strip()]
# Find pipe tokens and split the command accordingly
parts = []
current_part: list[str] = []
for token in tokens:
if token == '|':
if current_part:
parts.append(' '.join(current_part))
current_part = []
else:
current_part.append(token)
# Add the last part if it exists
if current_part:
parts.append(' '.join(current_part))
# If no pipes were found in tokens, check if the original command has pipes
# This handles cases where pipes are not separated by spaces
if len(parts) <= 1 and '|' in command:
return [part.strip() for part in command.split('|') if part.strip()]
return parts
def _generate_command_patterns(command: str) -> list[str]:
"""Generate candidate regex patterns for a command that can match similar commands.
This function handles both simple commands and piped commands.
For piped commands, it generates patterns for each sub-command.
Args:
command: The command to generate patterns for (may contain pipes).
Returns:
list[str]: List of candidate regex patterns that match similar commands.
"""
# Parse the command to handle pipes
sub_commands = _parse_piped_command(command)
if len(sub_commands) <= 1:
# Single command or empty, return all candidate patterns
return _generate_single_command_pattern(command)
else:
# Piped command, generate pattern for each sub-command
# For simplicity, we'll just use the first pattern from each sub-command
sub_patterns = []
for sub_command in sub_commands:
patterns = _generate_single_command_pattern(sub_command.strip())
if patterns:
# Use the most specific pattern (first one)
sub_pattern = patterns[0]
# Remove the ^ and $ anchors from sub-patterns
if sub_pattern.startswith('^'):
sub_pattern = sub_pattern[1:]
if sub_pattern.endswith('$'):
sub_pattern = sub_pattern[:-1]
sub_patterns.append(sub_pattern)
if sub_patterns:
# Join sub-patterns with pipe separator (allowing flexible whitespace around pipes)
pattern = '\\s*\\|\\s*'.join(sub_patterns)
return [f'^{pattern}$']
else:
return []
async def read_confirmation_input(
config: OpenHandsConfig, command: str = '', pending_action=None
) -> str:
async def read_confirmation_input(config: OpenHandsConfig) -> str:
try:
choices = [
'Yes, proceed',
'No (and allow to enter instructions)',
'Always proceed (skip all confirmations)',
"Yes, and don't ask again for similar commands",
"Always proceed (don't ask again)",
]
# keep the outer coroutine responsive by using asyncio.to_thread which puts the blocking call app.run() of cli_confirm() in a separate thread
@@ -841,89 +817,7 @@ async def read_confirmation_input(
cli_confirm, config, 'Choose an option:', choices
)
result = {0: 'yes', 1: 'no', 2: 'always', 3: 'remember'}.get(index, 'no')
# If the user chose "remember", show pattern selection options
if result == 'remember' and command:
return await _handle_pattern_selection(config, command)
return result
except (KeyboardInterrupt, EOFError):
return 'no'
async def _handle_pattern_selection(config: OpenHandsConfig, command: str) -> str:
"""Handle pattern selection when user chooses to remember similar commands."""
try:
# Generate candidate patterns
patterns = _generate_command_patterns(command)
# Create pattern descriptions
pattern_choices = []
pattern_data = []
# Add exact command option
import re
exact_pattern = f'^{re.escape(command)}$'
pattern_choices.append(f'Exact command: {command}')
pattern_data.append(('exact', exact_pattern, f'Exact command: {command}'))
# Add prefix-based patterns
parts = command.split()
if parts:
for i, pattern in enumerate(patterns):
if i < len(parts):
prefix = ' '.join(parts[: i + 1])
description = f'Commands starting with: {prefix}'
pattern_choices.append(description)
pattern_data.append(('pattern', pattern, description))
# Add custom pattern option
pattern_choices.append('Enter custom pattern')
pattern_data.append(('custom', '', 'Custom pattern'))
# Show pattern selection menu
pattern_index = await asyncio.to_thread(
cli_confirm, config, 'Choose which commands to remember:', pattern_choices
)
if pattern_index < len(pattern_data):
pattern_type, pattern_value, description = pattern_data[pattern_index]
if pattern_type == 'custom':
# Get custom pattern from user
prompt_session = create_prompt_session(config)
print_formatted_text(
HTML(
'<gold>Enter a custom regex pattern (example: ^ls.*$ for all ls commands):</gold>'
)
)
try:
custom_pattern = await prompt_session.prompt_async('Pattern: ')
if custom_pattern.strip():
# Validate the regex pattern
import re
try:
re.compile(custom_pattern.strip())
return f'remember:{custom_pattern.strip()}:Custom pattern: {custom_pattern.strip()}'
except re.error:
print_formatted_text(
HTML(
'<ansired>Invalid regex pattern. Using exact command instead.</ansired>'
)
)
return f'remember:^{re.escape(command)}$:Exact command: {command}'
else:
return 'no'
except (KeyboardInterrupt, EOFError):
return 'no'
else:
return f'remember:{pattern_value}:{description}'
return 'no'
return {0: 'yes', 1: 'no', 2: 'always'}.get(index, 'no')
except (KeyboardInterrupt, EOFError):
return 'no'
-79
View File
@@ -243,82 +243,3 @@ def read_file(file_path: str | Path) -> str:
def write_to_file(file_path: str | Path, content: str) -> None:
with open(file_path, 'w') as f:
f.write(content)
def save_approved_command_to_config(
command: str, pattern: str | None = None, description: str | None = None
) -> None:
"""Save an approved command or pattern to the config file.
Args:
command: The command to save as approved.
pattern: Optional regex pattern to save instead of exact command.
description: Optional description for the pattern.
"""
config_path = _LOCAL_CONFIG_FILE_PATH
# Load existing config or create a new one
if config_path.exists():
try:
with open(config_path, 'r') as f:
config_data = toml.load(f)
except Exception as e:
from openhands.core.logger import openhands_logger
openhands_logger.warning(f'Error loading config file: {e}')
config_data = {}
else:
config_data = {}
config_path.parent.mkdir(parents=True, exist_ok=True)
# Ensure security section exists
if 'security' not in config_data:
config_data['security'] = {}
if pattern and description:
# Save as a pattern
if 'approved_command_patterns' not in config_data['security']:
config_data['security']['approved_command_patterns'] = []
# Check if pattern already exists
pattern_exists = any(
p == pattern
for p in config_data['security']['approved_command_patterns']
if isinstance(p, str)
) or any(
p.get('pattern') == pattern
for p in config_data['security']['approved_command_patterns']
if isinstance(p, dict)
)
if not pattern_exists:
# Just save the pattern string directly
config_data['security']['approved_command_patterns'].append(pattern)
from openhands.core.logger import openhands_logger
openhands_logger.info(
f"Pattern '{pattern}' saved to approved command patterns in {config_path}"
)
else:
# Save as exact command
if 'approved_commands' not in config_data['security']:
config_data['security']['approved_commands'] = {}
# Add the command to the approved commands
config_data['security']['approved_commands'][command] = True
from openhands.core.logger import openhands_logger
openhands_logger.info(
f"Command '{command}' saved to approved commands in {config_path}"
)
# Write the updated config back to the file
try:
with open(config_path, 'w') as f:
toml.dump(config_data, f)
except Exception as e:
from openhands.core.logger import openhands_logger
openhands_logger.error(f'Error saving approved command to config: {e}')
+3 -34
View File
@@ -875,40 +875,9 @@ class AgentController:
if self.state.confirmation_mode and (
type(action) is CmdRunAction or type(action) is IPythonRunCellAction
):
# Check if the command is already approved
command = ''
if type(action) is CmdRunAction:
command = action.command
elif type(action) is IPythonRunCellAction:
command = action.code
# Get the security config
import toml
from openhands.core.config import SecurityConfig
# Load security config from the config file
security_config = SecurityConfig()
try:
with open('config.toml', 'r', encoding='utf-8') as f:
config_data = toml.load(f)
if 'security' in config_data:
security_config = SecurityConfig.model_validate(
config_data['security']
)
except Exception:
# If loading fails, use default config
pass
# Check if the command is approved
if security_config.is_command_approved(command):
# Command is already approved, no need for confirmation
action.confirmation_state = ActionConfirmationStatus.CONFIRMED
else:
# Command needs confirmation
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
self._pending_action = action
if not isinstance(action, NullAction):
+1 -47
View File
@@ -1,8 +1,5 @@
from pydantic import BaseModel, ConfigDict, Field, ValidationError
# ApprovedCommandPattern is now just a string containing the regex pattern
ApprovedCommandPattern = str
class SecurityConfig(BaseModel):
"""Configuration for security related functionalities.
@@ -10,33 +7,13 @@ class SecurityConfig(BaseModel):
Attributes:
confirmation_mode: Whether to enable confirmation mode.
security_analyzer: The security analyzer to use.
approved_command_patterns: List of regex patterns for commands that don't require confirmation.
approved_commands: Dictionary of exact commands that have been approved.
"""
confirmation_mode: bool = Field(default=False)
security_analyzer: str | None = Field(default=None)
approved_command_patterns: list[ApprovedCommandPattern] = Field(
default_factory=list
)
approved_commands: dict[str, bool] = Field(default_factory=dict)
model_config = ConfigDict(extra='forbid')
def is_command_approved(self, command: str) -> bool:
"""Check if a command is approved.
This is a stub method that always returns False.
The actual implementation is in CommandApprovalAnalyzer.
Args:
command: The command to check.
Returns:
bool: Always False in this stub implementation.
"""
return False
@classmethod
def from_toml_section(cls, data: dict) -> dict[str, 'SecurityConfig']:
"""
@@ -51,32 +28,9 @@ class SecurityConfig(BaseModel):
# Initialize the result mapping
security_mapping: dict[str, SecurityConfig] = {}
# Extract approved command patterns if present
approved_patterns = []
if 'approved_command_patterns' in data:
patterns_data = data.pop('approved_command_patterns')
if isinstance(patterns_data, list):
for pattern_data in patterns_data:
# Handle the new format (just a string pattern)
if isinstance(pattern_data, str):
approved_patterns.append(pattern_data)
# Handle the old format (dict with pattern and description)
elif isinstance(pattern_data, dict) and 'pattern' in pattern_data:
approved_patterns.append(pattern_data['pattern'])
# Extract approved commands if present
approved_commands = {}
if 'approved_commands' in data:
commands_data = data.pop('approved_commands')
if isinstance(commands_data, dict):
approved_commands = commands_data
# Try to create the configuration instance
try:
config = cls.model_validate(data)
config.approved_command_patterns = approved_patterns
config.approved_commands = approved_commands
security_mapping['security'] = config
security_mapping['security'] = cls.model_validate(data)
except ValidationError as e:
raise ValueError(f'Invalid security configuration: {e}')
+1 -1
View File
@@ -234,7 +234,7 @@ async def run_controller(
file_path = config.save_trajectory_path
os.makedirs(os.path.dirname(file_path), exist_ok=True)
histories = controller.get_trajectory(config.save_screenshots_in_trajectory)
with open(file_path, 'w') as f: # noqa: ASYNC101
with open(file_path, 'w') as f: # noqa
json.dump(histories, f, indent=4)
return state
+1 -1
View File
@@ -383,7 +383,7 @@ Do NOT assume the environment is the same as in the example above.
"""
example = example.lstrip()
return example
return refine_prompt(example)
IN_CONTEXT_LEARNING_EXAMPLE_PREFIX = get_example_for_tools
+8 -8
View File
@@ -571,7 +571,7 @@ class IssueResolver:
# checkout the repo
repo_dir = os.path.join(self.output_dir, 'repo')
if not os.path.exists(repo_dir):
checkout_output = subprocess.check_output( # noqa: ASYNC101
checkout_output = subprocess.check_output( # noqa
[
'git',
'clone',
@@ -584,7 +584,7 @@ class IssueResolver:
# get the commit id of current repo for reproducibility
base_commit = (
subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir) # noqa: ASYNC101
subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir) # noqa
.decode('utf-8')
.strip()
)
@@ -596,7 +596,7 @@ class IssueResolver:
repo_dir, '.openhands_instructions'
)
if os.path.exists(openhands_instructions_path):
with open(openhands_instructions_path, 'r') as f: # noqa: ASYNC101
with open(openhands_instructions_path, 'r') as f: # noqa
self.repo_instruction = f.read()
# OUTPUT FILE
@@ -605,7 +605,7 @@ class IssueResolver:
# Check if this issue was already processed
if os.path.exists(output_file):
with open(output_file, 'r') as f: # noqa: ASYNC101
with open(output_file, 'r') as f: # noqa
for line in f:
data = ResolverOutput.model_validate_json(line)
if data.issue.number == self.issue_number:
@@ -614,7 +614,7 @@ class IssueResolver:
)
return
output_fp = open(output_file, 'a') # noqa: ASYNC101
output_fp = open(output_file, 'a') # noqa
logger.info(
f'Resolving issue {self.issue_number} with Agent {AGENT_CLASS}, model {model_name}, max iterations {self.max_iterations}.'
@@ -633,20 +633,20 @@ class IssueResolver:
# Fetch the branch first to ensure it exists locally
fetch_cmd = ['git', 'fetch', 'origin', branch_to_use]
subprocess.check_output( # noqa: ASYNC101
subprocess.check_output( # noqa
fetch_cmd,
cwd=repo_dir,
)
# Checkout the branch
checkout_cmd = ['git', 'checkout', branch_to_use]
subprocess.check_output( # noqa: ASYNC101
subprocess.check_output( # noqa
checkout_cmd,
cwd=repo_dir,
)
base_commit = (
subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir) # noqa: ASYNC101
subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=repo_dir) # noqa
.decode('utf-8')
.strip()
)
@@ -69,7 +69,7 @@ class JupyterPlugin(Plugin):
# Using synchronous subprocess.Popen for Windows as asyncio.create_subprocess_shell
# has limitations on Windows platforms
self.gateway_process = subprocess.Popen( # type: ignore[ASYNC101] # noqa: ASYNC101
self.gateway_process = subprocess.Popen( # type: ignore[ASYNC101] # noqa
jupyter_launch_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
@@ -82,19 +82,19 @@ class JupyterPlugin(Plugin):
output = ''
while should_continue():
if self.gateway_process.stdout is None:
time.sleep(1) # type: ignore[ASYNC101] # noqa: ASYNC101
time.sleep(1) # type: ignore[ASYNC101] # noqa
continue
line = self.gateway_process.stdout.readline()
if not line:
time.sleep(1) # type: ignore[ASYNC101] # noqa: ASYNC101
time.sleep(1) # type: ignore[ASYNC101] # noqa
continue
output += line
if 'at' in line:
break
time.sleep(1) # type: ignore[ASYNC101] # noqa: ASYNC101
time.sleep(1) # type: ignore[ASYNC101] # noqa
logger.debug('Waiting for jupyter kernel gateway to start...')
logger.debug(
+2 -2
View File
@@ -86,7 +86,7 @@ async def read_file(
)
try:
with open(whole_path, 'r', encoding='utf-8') as file: # noqa: ASYNC101
with open(whole_path, 'r', encoding='utf-8') as file: # noqa
lines = read_lines(file.readlines(), start, end)
except FileNotFoundError:
return ErrorObservation(f'File not found: {path}')
@@ -127,7 +127,7 @@ async def write_file(
os.makedirs(os.path.dirname(whole_path))
mode = 'w' if not os.path.exists(whole_path) else 'r+'
try:
with open(whole_path, mode, encoding='utf-8') as file: # noqa: ASYNC101
with open(whole_path, mode, encoding='utf-8') as file: # noqa
if mode != 'w':
all_lines = file.readlines()
new_file = insert_lines(insert, all_lines, start, end)
-2
View File
@@ -1,9 +1,7 @@
from openhands.security.analyzer import SecurityAnalyzer
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
from openhands.security.invariant.analyzer import InvariantAnalyzer
__all__ = [
'SecurityAnalyzer',
'InvariantAnalyzer',
'CommandApprovalAnalyzer',
]
@@ -1,3 +0,0 @@
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
__all__ = ['CommandApprovalAnalyzer']
@@ -1,305 +0,0 @@
"""Command approval analyzer for security."""
import re
from typing import Any
import bashlex
from fastapi import Request
from openhands.core.logger import openhands_logger as logger
from openhands.events.action.action import (
Action,
ActionConfirmationStatus,
ActionSecurityRisk,
)
from openhands.events.action.commands import CmdRunAction, IPythonRunCellAction
from openhands.events.event import Event
from openhands.events.stream import EventStream
from openhands.security.analyzer import SecurityAnalyzer
class CommandPattern:
"""A pattern for matching commands."""
def __init__(self, pattern: str, description: str):
"""Initialize a new command pattern.
Args:
pattern: The regex pattern to match commands against.
description: A human-readable description of what this pattern matches.
"""
self.pattern = pattern
self.description = description
self._compiled_pattern = re.compile(pattern)
def matches(self, command: str) -> bool:
"""Check if the command matches this pattern.
Args:
command: The command to check.
Returns:
bool: True if the command matches, False otherwise.
"""
return bool(self._compiled_pattern.match(command))
class CommandParser:
"""Parser for bash commands using bashlex."""
def is_piped_command(self, command: str) -> bool:
"""Check if a command contains pipes.
Args:
command: The command to check.
Returns:
bool: True if the command contains pipes, False otherwise.
"""
if not command or not command.strip():
return False
try:
parts = bashlex.parse(command)
for part in parts:
if part.kind == 'pipeline':
return True
return False
except Exception as e:
logger.warning(f'Error parsing command with bashlex: {e}')
# Fallback: check for pipe character not in quotes
# This is a simple heuristic and not as accurate as bashlex parsing
in_single_quote = False
in_double_quote = False
for char in command:
if char == "'" and not in_double_quote:
in_single_quote = not in_single_quote
elif char == '"' and not in_single_quote:
in_double_quote = not in_double_quote
elif char == '|' and not in_single_quote and not in_double_quote:
return True
return False
def parse_command(self, command: str) -> list[str]:
"""Parse a command into individual parts, handling pipes.
Args:
command: The command to parse.
Returns:
List[str]: List of individual commands.
"""
if not command or not command.strip():
return []
try:
parts = bashlex.parse(command)
commands = []
# Helper function to extract command from a node
def extract_command(node):
if node.kind == 'command':
cmd_parts = []
for part in node.parts:
if hasattr(part, 'word'):
cmd_parts.append(part.word)
if cmd_parts:
return ' '.join(cmd_parts)
return None
# Process the AST
for part in parts:
if part.kind == 'pipeline':
# A pipeline has multiple commands
for subpart in part.parts:
if subpart.kind == 'command':
cmd = extract_command(subpart)
if cmd:
commands.append(cmd)
elif part.kind == 'command':
# A single command
cmd = extract_command(part)
if cmd:
commands.append(cmd)
elif part.kind == 'list':
# A list of commands (e.g., with && or ||)
# We only take the first command for approval purposes
for subpart in part.parts:
if subpart.kind == 'command':
cmd = extract_command(subpart)
if cmd:
commands.append(cmd)
break
elif subpart.kind == 'operator':
# Stop at the first operator
break
return commands
except Exception as e:
logger.warning(f'Error parsing command with bashlex: {e}')
# Fallback: simple split by pipe
# This is a simple heuristic and not as accurate as bashlex parsing
if '|' in command:
return [part.strip() for part in command.split('|') if part.strip()]
else:
return [command.strip()] if command.strip() else []
class CommandApprovalAnalyzer(SecurityAnalyzer):
"""Security analyzer that automatically approves commands based on patterns and previously approved commands."""
def __init__(
self,
event_stream: EventStream,
policy: str | None = None,
sid: str | None = None,
) -> None:
"""Initializes a new instance of the CommandApprovalAnalyzer class."""
super().__init__(event_stream)
self.parser = CommandParser()
self.approved_commands: dict[
str, bool
] = {} # Dict of exact commands that have been approved
self.approved_patterns: list[
CommandPattern
] = [] # List of regex patterns for approved commands
self.compiled_patterns: dict[
str, re.Pattern
] = {} # Cache of compiled regex patterns
# Add some default patterns
self._add_default_patterns()
def _add_default_patterns(self) -> None:
"""Add default command patterns that are always approved."""
# Simple, safe commands
self.approved_patterns.append(
CommandPattern(
pattern=r'^ls(\s+-[a-zA-Z]+)*(\s+\S+)*$',
description='List directory contents',
)
)
self.approved_patterns.append(
CommandPattern(pattern=r'^cd(\s+\S+)?$', description='Change directory')
)
self.approved_patterns.append(
CommandPattern(pattern=r'^pwd$', description='Print working directory')
)
self.approved_patterns.append(
CommandPattern(pattern=r'^echo\s+.*$', description='Echo text')
)
def is_command_approved(self, command: str) -> bool:
"""Check if a command is approved and doesn't need confirmation.
Args:
command: The command to check.
Returns:
bool: True if the command is approved, False otherwise.
"""
if not command or not command.strip():
return False
# Check if this is a piped command
if self.parser.is_piped_command(command):
# For piped commands, all parts must be approved
sub_commands = self.parser.parse_command(command)
return all(self._is_single_command_approved(cmd) for cmd in sub_commands)
else:
# For single commands, just check directly
return self._is_single_command_approved(command)
def _is_single_command_approved(self, command: str) -> bool:
"""Check if a single (non-piped) command is approved.
Args:
command: The command to check.
Returns:
bool: True if the command is approved, False otherwise.
"""
command = command.strip()
# Check exact matches first
if command in self.approved_commands:
return self.approved_commands[command]
# Then check patterns from CommandPattern objects
for pattern in self.approved_patterns:
if pattern.matches(command):
return True
# Then check string patterns from the config
from openhands.core.config import load_openhands_config
try:
config = load_openhands_config()
if hasattr(config, 'security') and hasattr(
config.security, 'approved_command_patterns'
):
for pattern_str in config.security.approved_command_patterns:
# Compile the pattern if not already compiled
if pattern_str not in self.compiled_patterns:
try:
self.compiled_patterns[pattern_str] = re.compile(
pattern_str
)
except re.error:
# Skip invalid patterns
continue
# Check if the command matches the pattern
if self.compiled_patterns[pattern_str].match(command):
return True
except Exception:
# If there's any error loading the config, just continue without checking patterns
pass
return False
def approve_command(self, command: str) -> None:
"""Add a command to the approved commands list.
Args:
command: The command to approve.
"""
self.approved_commands[command] = True
# In a real implementation, we would save this to config.toml
logger.info(f"Command '{command}' approved for future use")
async def handle_api_request(self, request: Request) -> Any:
"""Handles the incoming API request."""
# This analyzer doesn't need to handle API requests
return {'message': "Command approval analyzer doesn't support API requests"}
async def security_risk(self, event: Action) -> ActionSecurityRisk:
"""Evaluates the Action for security risks and returns the risk level.
For command approval analyzer, we always return LOW risk level,
but we set the confirmation_state based on whether the command is approved.
"""
# Only process CmdRunAction and IPythonRunCellAction
if isinstance(event, CmdRunAction):
command = event.command
if self.is_command_approved(command):
event.confirmation_state = ActionConfirmationStatus.CONFIRMED
logger.info(f'Command automatically approved: {command}')
elif isinstance(event, IPythonRunCellAction):
code = event.code
if self.is_command_approved(code):
event.confirmation_state = ActionConfirmationStatus.CONFIRMED
logger.info(f'Python code automatically approved: {code}')
# Always return LOW risk level - we're not evaluating risk, just auto-approving
return ActionSecurityRisk.LOW
async def act(self, event: Event) -> None:
"""Performs an action based on the analyzed event.
This analyzer doesn't need to perform any actions since command approval
is handled directly in the CLI interface.
"""
pass
-2
View File
@@ -1,8 +1,6 @@
from openhands.security.analyzer import SecurityAnalyzer
from openhands.security.command_approval.analyzer import CommandApprovalAnalyzer
from openhands.security.invariant.analyzer import InvariantAnalyzer
SecurityAnalyzers: dict[str, type[SecurityAnalyzer]] = {
'invariant': InvariantAnalyzer,
'command_approval': CommandApprovalAnalyzer,
}
+3 -1
View File
@@ -4,6 +4,7 @@ from itertools import islice
from jinja2 import Template
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
from openhands.controller.state.state import State
from openhands.core.message import Message, TextContent
from openhands.events.observation.agent import MicroagentKnowledge
@@ -91,7 +92,8 @@ class PromptManager:
return Template(file.read())
def get_system_message(self) -> str:
return self.system_template.render().strip()
system_message = self.system_template.render().strip()
return refine_prompt(system_message)
def get_example_user_message(self) -> str:
"""This is an initial user message that can be provided to the agent
Generated
+33 -2
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
[[package]]
name = "aiofiles"
@@ -5152,8 +5152,11 @@ files = [
{file = "lxml-5.4.0-cp36-cp36m-win_amd64.whl", hash = "sha256:7ce1a171ec325192c6a636b64c94418e71a1964f56d002cc28122fceff0b6121"},
{file = "lxml-5.4.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:795f61bcaf8770e1b37eec24edf9771b307df3af74d1d6f27d812e15a9ff3872"},
{file = "lxml-5.4.0-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:29f451a4b614a7b5b6c2e043d7b64a15bd8304d7e767055e8ab68387a8cacf4e"},
{file = "lxml-5.4.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:891f7f991a68d20c75cb13c5c9142b2a3f9eb161f1f12a9489c82172d1f133c0"},
{file = "lxml-5.4.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4aa412a82e460571fad592d0f93ce9935a20090029ba08eca05c614f99b0cc92"},
{file = "lxml-5.4.0-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:ac7ba71f9561cd7d7b55e1ea5511543c0282e2b6450f122672a2694621d63b7e"},
{file = "lxml-5.4.0-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:c5d32f5284012deaccd37da1e2cd42f081feaa76981f0eaa474351b68df813c5"},
{file = "lxml-5.4.0-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:ce31158630a6ac85bddd6b830cffd46085ff90498b397bd0a259f59d27a12188"},
{file = "lxml-5.4.0-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:31e63621e073e04697c1b2d23fcb89991790eef370ec37ce4d5d469f40924ed6"},
{file = "lxml-5.4.0-cp37-cp37m-win32.whl", hash = "sha256:be2ba4c3c5b7900246a8f866580700ef0d538f2ca32535e991027bdaba944063"},
{file = "lxml-5.4.0-cp37-cp37m-win_amd64.whl", hash = "sha256:09846782b1ef650b321484ad429217f5154da4d6e786636c38e434fa32e94e49"},
@@ -5227,6 +5230,22 @@ files = [
[package.dependencies]
cobble = ">=0.1.3,<0.2"
[[package]]
name = "markdown"
version = "3.8.2"
description = "Python implementation of John Gruber's Markdown."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "markdown-3.8.2-py3-none-any.whl", hash = "sha256:5c83764dbd4e00bdd94d85a19b8d55ccca20fe35b2e678a1422b380324dd5f24"},
{file = "markdown-3.8.2.tar.gz", hash = "sha256:247b9a70dd12e27f67431ce62523e675b866d254f900c4fe75ce3dda62237c45"},
]
[package.extras]
docs = ["mdx_gh_links (>=0.2)", "mkdocs (>=1.6)", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-nature (>=0.6)", "mkdocs-section-index", "mkdocstrings[python]"]
testing = ["coverage", "pyyaml"]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
@@ -10446,6 +10465,18 @@ files = [
]
markers = {main = "extra == \"third-party-runtimes\""}
[[package]]
name = "types-markdown"
version = "3.8.0.20250708"
description = "Typing stubs for Markdown"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_markdown-3.8.0.20250708-py3-none-any.whl", hash = "sha256:d1f634931b463adf7603c012724b7e9e5eff976eb517dc700ebece2d6189b1ce"},
{file = "types_markdown-3.8.0.20250708.tar.gz", hash = "sha256:28690251fe90757f5a99cd671c79502bc2de07aef2d35fe54117c3b1c799804a"},
]
[[package]]
name = "types-python-dateutil"
version = "2.9.0.20250516"
@@ -11766,4 +11797,4 @@ third-party-runtimes = ["daytona", "e2b", "modal", "runloop-api-client"]
[metadata]
lock-version = "2.1"
python-versions = "^3.12,<3.14"
content-hash = "8568c6ec2e11d4fcb23e206a24896b4d2d50e694c04011b668148f484e95b406"
content-hash = "0a2be134709df49a9e5132fdf0ec887f2a8cb99be0ed244349be638cbb48364b"
+2
View File
@@ -42,6 +42,8 @@ numpy = "*"
json-repair = "*"
browsergym-core = "0.13.3" # integrate browsergym-core as the browsing interface
html2text = "*"
markdown = "*" # For markdown processing in CLI
types-Markdown = "*" # Type stubs for markdown
deprecated = "*"
pexpect = "*"
jinja2 = "^3.1.3"
+4 -4
View File
@@ -145,8 +145,8 @@ class TestThoughtDisplayOrder:
# Verify that final thought is displayed
mock_display_message.assert_called_once_with('This is a final thought.')
@patch('openhands.cli.tui.display_message')
def test_message_action_from_agent(self, mock_display_message):
@patch('openhands.cli.tui.display_agent_message')
def test_message_action_from_agent(self, mock_display_agent_message):
"""Test that MessageAction from agent is displayed."""
config = MagicMock(spec=OpenHandsConfig)
@@ -156,8 +156,8 @@ class TestThoughtDisplayOrder:
display_event(message_action, config)
# Verify that message is displayed
mock_display_message.assert_called_once_with('Hello from agent')
# Verify that agent message is displayed
mock_display_agent_message.assert_called_once_with('Hello from agent')
@patch('openhands.cli.tui.display_message')
def test_message_action_from_user_not_displayed(self, mock_display_message):
+51 -3
View File
@@ -6,6 +6,8 @@ from openhands.cli.tui import (
CustomDiffLexer,
UsageMetrics,
UserCancelledError,
display_agent_finish,
display_agent_message,
display_banner,
display_command,
display_event,
@@ -26,6 +28,7 @@ from openhands.events import EventSource
from openhands.events.action import (
Action,
ActionConfirmationStatus,
AgentFinishAction,
CmdRunAction,
MCPAction,
MessageAction,
@@ -107,15 +110,15 @@ class TestDisplayFunctions:
assert 'What do you want to build?' in message_text
assert 'Type /help for help' in message_text
@patch('openhands.cli.tui.display_message')
def test_display_event_message_action(self, mock_display_message):
@patch('openhands.cli.tui.display_agent_message')
def test_display_event_message_action(self, mock_display_agent_message):
config = MagicMock(spec=OpenHandsConfig)
message = MessageAction(content='Test message')
message._source = EventSource.AGENT
display_event(message, config)
mock_display_message.assert_called_once_with('Test message')
mock_display_agent_message.assert_called_once_with('Test message')
@patch('openhands.cli.tui.display_command')
def test_display_event_cmd_action(self, mock_display_command):
@@ -182,6 +185,15 @@ class TestDisplayFunctions:
mock_display_message.assert_called_once_with('Thinking about this...')
@patch('openhands.cli.tui.display_agent_finish')
def test_display_event_agent_finish(self, mock_display_agent_finish):
config = MagicMock(spec=OpenHandsConfig)
finish_action = AgentFinishAction(final_thought='Task completed')
display_event(finish_action, config)
mock_display_agent_finish.assert_called_once_with(finish_action)
@patch('openhands.cli.tui.display_mcp_action')
def test_display_event_mcp_action(self, mock_display_mcp_action):
config = MagicMock(spec=OpenHandsConfig)
@@ -256,6 +268,42 @@ class TestDisplayFunctions:
args, kwargs = mock_print.call_args
assert message in str(args[0])
@patch('openhands.cli.tui.print_container')
@patch('openhands.cli.tui.print_formatted_text')
def test_display_agent_message(self, mock_print_formatted, mock_print_container):
message = 'Agent message'
display_agent_message(message)
mock_print_formatted.assert_called_once()
mock_print_container.assert_called_once()
@patch('openhands.cli.tui.print_container')
@patch('openhands.cli.tui.print_formatted_text')
def test_display_agent_finish_with_thought(
self, mock_print_formatted, mock_print_container
):
finish_action = AgentFinishAction(thought='Final thought')
display_agent_finish(finish_action)
mock_print_formatted.assert_called_once()
mock_print_container.assert_called_once()
@patch('openhands.cli.tui.print_container')
@patch('openhands.cli.tui.print_formatted_text')
def test_display_agent_finish_with_task_completed(
self, mock_print_formatted, mock_print_container
):
from openhands.events.action.agent import AgentFinishTaskCompleted
finish_action = AgentFinishAction()
finish_action.task_completed = AgentFinishTaskCompleted.TRUE
display_agent_finish(finish_action)
mock_print_formatted.assert_called_once()
mock_print_container.assert_called_once()
@patch('openhands.cli.tui.print_container')
def test_display_command_awaiting_confirmation(self, mock_print_container):
cmd_action = CmdRunAction(command='echo test')
-373
View File
@@ -1,373 +0,0 @@
"""Tests for command pattern generation and parsing."""
import re
from unittest.mock import MagicMock, patch
import pytest
from openhands.cli.tui import (
_generate_command_patterns,
_generate_single_command_pattern,
_handle_pattern_selection,
_parse_piped_command,
read_confirmation_input,
)
class TestGenerateSingleCommandPattern:
"""Test the _generate_single_command_pattern function."""
def test_empty_command(self):
"""Test pattern generation for empty command."""
patterns = _generate_single_command_pattern('')
assert len(patterns) == 1
assert patterns[0] == '^$'
def test_single_word_command(self):
"""Test pattern generation for single word command."""
patterns = _generate_single_command_pattern('ls')
assert len(patterns) == 1
assert patterns[0] == '^ls(\\s.*|$)'
def test_two_word_command(self):
"""Test pattern generation for two word command."""
patterns = _generate_single_command_pattern('ls -la')
assert len(patterns) == 2
assert patterns[0] == '^ls(\\s.*|$)'
assert patterns[1] == '^ls\\s+\\-la.*$'
def test_three_word_command(self):
"""Test pattern generation for three word command."""
patterns = _generate_single_command_pattern('git commit -m')
assert len(patterns) == 3
assert patterns[0] == '^git(\\s.*|$)'
assert patterns[1] == '^git\\s+commit.*$'
assert patterns[2] == '^git\\s+commit\\s+\\-m.*$'
def test_four_word_command(self):
"""Test pattern generation for four word command (should only generate 3 patterns)."""
patterns = _generate_single_command_pattern('git commit -m message')
assert len(patterns) == 3 # Only first 3 prefixes
assert patterns[0] == '^git(\\s.*|$)'
assert patterns[1] == '^git\\s+commit.*$'
assert patterns[2] == '^git\\s+commit\\s+\\-m.*$'
def test_pattern_matching(self):
"""Test that generated patterns actually match similar commands."""
patterns = _generate_single_command_pattern('ls -la')
# Test first pattern (ls.*)
pattern1 = re.compile(patterns[0])
assert pattern1.match('ls')
assert pattern1.match('ls -la')
assert pattern1.match('ls -alh /home')
assert not pattern1.match('cat file.txt')
# Test second pattern (ls -la.*)
pattern2 = re.compile(patterns[1])
assert pattern2.match('ls -la')
assert pattern2.match('ls -la /home')
assert not pattern2.match('ls')
assert not pattern2.match('ls -alh')
def test_special_characters_escaped(self):
"""Test that special regex characters are properly escaped."""
patterns = _generate_single_command_pattern('echo $HOME')
assert len(patterns) == 2
assert patterns[0] == '^echo(\\s.*|$)'
assert patterns[1] == '^echo\\s+\\$HOME.*$'
# Test that the pattern works
pattern = re.compile(patterns[1])
assert pattern.match('echo $HOME')
assert pattern.match('echo $HOME/test')
class TestParsePipedCommand:
"""Test the _parse_piped_command function."""
def test_empty_command(self):
"""Test parsing empty command."""
result = _parse_piped_command('')
assert result == []
def test_whitespace_only_command(self):
"""Test parsing whitespace-only command."""
result = _parse_piped_command(' ')
assert result == []
def test_single_command(self):
"""Test parsing single command without pipes."""
result = _parse_piped_command('ls -la')
assert result == ['ls -la']
def test_simple_piped_command(self):
"""Test parsing simple piped command."""
result = _parse_piped_command('ls -la | grep test')
assert result == ['ls -la', 'grep test']
def test_three_command_pipe(self):
"""Test parsing three-command pipe."""
result = _parse_piped_command('cat file.txt | grep pattern | wc -l')
assert result == ['cat file.txt', 'grep pattern', 'wc -l']
def test_pipe_with_quotes(self):
"""Test parsing piped command with quoted arguments."""
result = _parse_piped_command('echo "hello world" | grep "hello"')
# shlex removes quotes, so we get the unquoted content
assert result == ['echo hello world', 'grep hello']
def test_pipe_without_spaces(self):
"""Test parsing piped command without spaces around pipes."""
result = _parse_piped_command('ls|grep test')
assert result == ['ls', 'grep test']
def test_complex_command_with_options(self):
"""Test parsing complex command with various options."""
result = _parse_piped_command(
"find /home -name '*.py' | xargs grep -l 'import os'"
)
# shlex removes quotes, so we get the unquoted content
assert result == ['find /home -name *.py', 'xargs grep -l import os']
def test_invalid_quotes(self):
"""Test parsing command with invalid quotes falls back gracefully."""
result = _parse_piped_command('echo "unclosed quote | grep test')
assert result == ['echo "unclosed quote', 'grep test']
class TestGenerateCommandPatterns:
"""Test the _generate_command_patterns function."""
def test_single_command(self):
"""Test pattern generation for single command."""
patterns = _generate_command_patterns('ls -la')
assert len(patterns) == 2
assert patterns[0] == '^ls(\\s.*|$)'
assert patterns[1] == '^ls\\s+\\-la.*$'
def test_piped_command(self):
"""Test pattern generation for piped command."""
patterns = _generate_command_patterns('ls -la | grep test')
assert len(patterns) == 1
# Should combine the first pattern from each sub-command
assert patterns[0] == '^ls(\\s.*|$)\\s*\\|\\s*grep(\\s.*|$)$'
def test_empty_command(self):
"""Test pattern generation for empty command."""
patterns = _generate_command_patterns('')
assert len(patterns) == 1
assert patterns[0] == '^$'
class TestReadConfirmationInput:
"""Test the read_confirmation_input function."""
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_yes_option(self, mock_confirm):
"""Test selecting 'yes' option."""
mock_confirm.return_value = 0 # First option (Yes, proceed)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'yes'
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_no_option(self, mock_confirm):
"""Test selecting 'no' option."""
mock_confirm.return_value = 1 # Second option (No)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'no'
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_always_option(self, mock_confirm):
"""Test selecting 'always' option."""
mock_confirm.return_value = 2 # Third option (Always proceed)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'always'
@pytest.mark.asyncio
@patch('openhands.cli.tui._handle_pattern_selection')
@patch('openhands.cli.tui.cli_confirm')
async def test_remember_option(self, mock_confirm, mock_pattern_selection):
"""Test selecting 'remember' option."""
mock_confirm.return_value = 3 # Fourth option (Remember)
mock_pattern_selection.return_value = (
'remember:^ls.*$:Commands starting with: ls'
)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await read_confirmation_input(config=config, command='ls -la')
assert result == 'remember:^ls.*$:Commands starting with: ls'
mock_pattern_selection.assert_called_once_with(config, 'ls -la')
class TestHandlePatternSelection:
"""Test the _handle_pattern_selection function."""
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_exact_command_selection(self, mock_confirm):
"""Test selecting exact command pattern."""
mock_confirm.return_value = 0 # First option (exact command)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result.startswith('remember:^ls\\ \\-la$:Exact command: ls -la')
@pytest.mark.asyncio
@patch('openhands.cli.tui.cli_confirm')
async def test_prefix_pattern_selection(self, mock_confirm):
"""Test selecting prefix pattern."""
mock_confirm.return_value = 1 # Second option (first prefix pattern)
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result.startswith('remember:^ls(\\s.*|$):Commands starting with: ls')
@pytest.mark.asyncio
@patch('openhands.cli.tui.create_prompt_session')
@patch('openhands.cli.tui.cli_confirm')
async def test_custom_pattern_selection_valid(
self, mock_confirm, mock_create_session
):
"""Test selecting custom pattern with valid regex."""
mock_confirm.return_value = (
3 # Custom pattern option (assuming 3 total options)
)
# Mock the prompt session
mock_session = MagicMock()
# Create a proper async mock
async def mock_prompt_async(prompt):
return '^git.*$'
mock_session.prompt_async = mock_prompt_async
mock_create_session.return_value = mock_session
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
assert result == 'remember:^git.*$:Custom pattern: ^git.*$'
@pytest.mark.asyncio
@patch('openhands.cli.tui.create_prompt_session')
@patch('openhands.cli.tui.cli_confirm')
@patch('openhands.cli.tui.print_formatted_text')
async def test_custom_pattern_selection_invalid(
self, mock_print, mock_confirm, mock_create_session
):
"""Test selecting custom pattern with invalid regex."""
mock_confirm.return_value = 3 # Custom pattern option
# Mock the prompt session to return invalid regex
mock_session = MagicMock()
# Create a proper async mock
async def mock_prompt_async(prompt):
return '[invalid regex'
mock_session.prompt_async = mock_prompt_async
mock_create_session.return_value = mock_session
config = MagicMock()
config.cli = MagicMock(vi_mode=False)
result = await _handle_pattern_selection(config, 'ls -la')
# Should fall back to exact command
assert result.startswith('remember:^ls\\ \\-la$:Exact command: ls -la')
# Should print error message
mock_print.assert_called()
class TestPatternMatching:
"""Test that the generated patterns work correctly for matching commands."""
def test_ls_patterns(self):
"""Test patterns generated for ls command."""
patterns = _generate_single_command_pattern('ls -alh')
# Test first pattern (ls.*)
pattern1 = re.compile(patterns[0])
assert pattern1.match('ls')
assert pattern1.match('ls -la')
assert pattern1.match('ls -alh /home')
assert pattern1.match('ls --help')
assert not pattern1.match('cat file.txt')
assert not pattern1.match('lsof') # Should not match partial word
# Test second pattern (ls -alh.*)
pattern2 = re.compile(patterns[1])
assert pattern2.match('ls -alh')
assert pattern2.match('ls -alh /home')
assert not pattern2.match('ls')
assert not pattern2.match('ls -la')
def test_git_patterns(self):
"""Test patterns generated for git command."""
patterns = _generate_single_command_pattern('git commit -m')
# Test git(\s.*|$)
pattern1 = re.compile(patterns[0])
assert pattern1.match('git status')
assert pattern1.match('git commit')
assert pattern1.match('git push origin main')
assert not pattern1.match('github') # Should not match partial word
# Test git commit.*
pattern2 = re.compile(patterns[1])
assert pattern2.match('git commit')
assert pattern2.match("git commit -m 'message'")
assert pattern2.match('git commit --amend')
assert not pattern2.match('git status')
assert not pattern2.match('git push')
def test_piped_command_patterns(self):
"""Test patterns generated for piped commands."""
patterns = _generate_command_patterns('cat file.txt | grep pattern')
pattern = re.compile(patterns[0])
assert pattern.match('cat file.txt | grep pattern')
assert pattern.match('cat another.txt | grep something')
assert pattern.match('cat /path/to/file | grep test')
assert not pattern.match('cat file.txt')
assert not pattern.match('grep pattern')
def test_complex_command_patterns(self):
"""Test patterns for complex commands with special characters."""
patterns = _generate_single_command_pattern("find /home -name '*.py'")
# Test find(\s.*|$)
pattern1 = re.compile(patterns[0])
assert pattern1.match("find /home -name '*.py'")
assert pattern1.match('find . -type f')
assert pattern1.match('find /usr/bin -executable')
assert not pattern1.match('finder') # Should not match partial word
# Test find /home.*
pattern2 = re.compile(patterns[1])
assert pattern2.match("find /home -name '*.py'")
assert pattern2.match('find /home -type d')
assert not pattern2.match("find . -name '*.py'")
assert not pattern2.match("find /usr -name '*.py'")
@@ -1,48 +0,0 @@
"""Tests for the security config pipe parsing functionality."""
from openhands.security.command_approval.analyzer import CommandParser
class TestBashlexParsing:
"""Test bashlex parsing functionality."""
def test_bashlex_pipe_detection(self):
"""Test detection of piped commands using bashlex."""
parser = CommandParser()
# Commands with pipes
assert parser.is_piped_command('ls -la | grep .py')
assert parser.is_piped_command('cat file.txt | head -10 | tail -5')
assert parser.is_piped_command("find . -name '*.py' | xargs grep 'import'")
# Commands without pipes
assert not parser.is_piped_command('ls -la')
assert not parser.is_piped_command("echo 'hello world'")
assert not parser.is_piped_command('ls -la > output.txt')
# Edge cases
assert not parser.is_piped_command('')
# Pipe in quotes is not a real pipe
assert not parser.is_piped_command("echo 'hello | world'")
def test_bashlex_command_extraction(self):
"""Test extraction of commands from pipelines using bashlex."""
parser = CommandParser()
# Simple command
assert parser.parse_command('ls -la') == ['ls -la']
# Piped commands
assert parser.parse_command('ls -la | grep .py') == ['ls -la', 'grep .py']
assert parser.parse_command('cat file.txt | head -10 | tail -5') == [
'cat file.txt',
'head -10',
'tail -5',
]
# Commands with redirections
assert parser.parse_command('ls -la > output.txt') == ['ls -la']
# Edge cases
assert parser.parse_command('') == []
assert parser.parse_command("echo 'hello | world'") == ['echo hello | world']
@@ -0,0 +1,179 @@
import sys
from unittest.mock import patch
import pytest
from openhands.agenthub.codeact_agent.codeact_agent import CodeActAgent
from openhands.core.config import AgentConfig
from openhands.llm.llm import LLM
# Skip all tests in this module if not running on Windows
pytestmark = pytest.mark.skipif(
sys.platform != 'win32', reason='Windows prompt refinement tests require Windows'
)
@pytest.fixture
def mock_llm():
"""Create a mock LLM for testing."""
llm = LLM(config={'model': 'gpt-4', 'api_key': 'test'})
return llm
@pytest.fixture
def agent_config():
"""Create a basic agent config for testing."""
return AgentConfig()
def test_codeact_agent_system_prompt_no_bash_on_windows(mock_llm, agent_config):
"""Test that CodeActAgent's system prompt doesn't contain 'bash' on Windows."""
# Create a CodeActAgent instance
agent = CodeActAgent(llm=mock_llm, config=agent_config)
# Get the system prompt
system_prompt = agent.prompt_manager.get_system_message()
# Assert that 'bash' doesn't exist in the system prompt (case-insensitive)
assert 'bash' not in system_prompt.lower(), (
f"System prompt contains 'bash' on Windows platform. "
f"It should be replaced with 'powershell'. "
f'System prompt: {system_prompt}'
)
# Verify that 'powershell' exists instead (case-insensitive)
assert 'powershell' in system_prompt.lower(), (
f"System prompt should contain 'powershell' on Windows platform. "
f'System prompt: {system_prompt}'
)
def test_codeact_agent_tool_descriptions_no_bash_on_windows(mock_llm, agent_config):
"""Test that CodeActAgent's tool descriptions don't contain 'bash' on Windows."""
# Create a CodeActAgent instance
agent = CodeActAgent(llm=mock_llm, config=agent_config)
# Get the tools
tools = agent.tools
# Check each tool's description and parameters
for tool in tools:
if tool['type'] == 'function':
function_info = tool['function']
# Check function description
description = function_info.get('description', '')
assert 'bash' not in description.lower(), (
f"Tool '{function_info['name']}' description contains 'bash' on Windows. "
f'Description: {description}'
)
# Check parameter descriptions
parameters = function_info.get('parameters', {})
properties = parameters.get('properties', {})
for param_name, param_info in properties.items():
param_description = param_info.get('description', '')
assert 'bash' not in param_description.lower(), (
f"Tool '{function_info['name']}' parameter '{param_name}' "
f"description contains 'bash' on Windows. "
f'Parameter description: {param_description}'
)
def test_in_context_learning_example_no_bash_on_windows():
"""Test that in-context learning examples don't contain 'bash' on Windows."""
from openhands.agenthub.codeact_agent.tools.bash import create_cmd_run_tool
from openhands.agenthub.codeact_agent.tools.finish import FinishTool
from openhands.agenthub.codeact_agent.tools.str_replace_editor import (
create_str_replace_editor_tool,
)
from openhands.llm.fn_call_converter import get_example_for_tools
# Create a sample set of tools
tools = [
create_cmd_run_tool(),
create_str_replace_editor_tool(),
FinishTool,
]
# Get the in-context learning example
example = get_example_for_tools(tools)
# Assert that 'bash' doesn't exist in the example (case-insensitive)
assert 'bash' not in example.lower(), (
f"In-context learning example contains 'bash' on Windows platform. "
f"It should be replaced with 'powershell'. "
f'Example: {example}'
)
# Verify that 'powershell' exists instead (case-insensitive)
if example: # Only check if example is not empty
assert 'powershell' in example.lower(), (
f"In-context learning example should contain 'powershell' on Windows platform. "
f'Example: {example}'
)
def test_refine_prompt_function_works():
"""Test that the refine_prompt function correctly replaces 'bash' with 'powershell'."""
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
# Test basic replacement
test_prompt = 'Execute a bash command to list files'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert 'powershell' in refined_prompt.lower()
assert refined_prompt == 'Execute a powershell command to list files'
# Test multiple occurrences
test_prompt = 'Use bash to run bash commands in the bash shell'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert (
refined_prompt
== 'Use powershell to run powershell commands in the powershell shell'
)
# Test case sensitivity
test_prompt = 'BASH and Bash and bash should all be replaced'
refined_prompt = refine_prompt(test_prompt)
assert 'bash' not in refined_prompt.lower()
assert (
refined_prompt
== 'powershell and powershell and powershell should all be replaced'
)
# Test execute_bash tool name replacement
test_prompt = 'Use the execute_bash tool to run commands'
refined_prompt = refine_prompt(test_prompt)
assert 'execute_bash' not in refined_prompt.lower()
assert 'execute_powershell' in refined_prompt.lower()
assert refined_prompt == 'Use the execute_powershell tool to run commands'
# Test that words containing 'bash' but not equal to 'bash' are preserved
test_prompt = 'The bashful person likes bash-like syntax'
refined_prompt = refine_prompt(test_prompt)
# 'bashful' should be preserved, 'bash-like' should become 'powershell-like'
assert 'bashful' in refined_prompt
assert 'powershell-like' in refined_prompt
assert refined_prompt == 'The bashful person likes powershell-like syntax'
def test_refine_prompt_function_on_non_windows():
"""Test that the refine_prompt function doesn't change anything on non-Windows platforms."""
from openhands.agenthub.codeact_agent.tools.bash import refine_prompt
# Mock sys.platform to simulate non-Windows
with patch('openhands.agenthub.codeact_agent.tools.bash.sys.platform', 'linux'):
test_prompt = 'Execute a bash command to list files'
refined_prompt = refine_prompt(test_prompt)
# On non-Windows, the prompt should remain unchanged
assert refined_prompt == test_prompt
assert 'bash' in refined_prompt.lower()