mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
3 Commits
0.51.0
...
fix-direct
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c25795e094 | ||
|
|
7bb84c1d02 | ||
|
|
a88f8d3851 |
@@ -231,6 +231,26 @@ def split_is_actually_version(split: list[str]) -> bool:
|
||||
|
||||
|
||||
def read_file(file_path: str | Path) -> str:
|
||||
"""Read content from a file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to read
|
||||
|
||||
Returns:
|
||||
str: Content of the file
|
||||
|
||||
Raises:
|
||||
IsADirectoryError: If the path is a directory
|
||||
FileNotFoundError: If the file doesn't exist
|
||||
PermissionError: If there are permission issues
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
path = Path(file_path)
|
||||
|
||||
if path.is_dir():
|
||||
raise IsADirectoryError(f"'{file_path}' is a directory, not a file")
|
||||
|
||||
with open(file_path, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
@@ -18,7 +18,24 @@ def read_input(cli_multiline_input: bool = False) -> str:
|
||||
|
||||
|
||||
def read_task_from_file(file_path: str) -> str:
|
||||
"""Read task from the specified file."""
|
||||
"""Read task from the specified file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file to read
|
||||
|
||||
Returns:
|
||||
str: Content of the file
|
||||
|
||||
Raises:
|
||||
IsADirectoryError: If the path is a directory
|
||||
FileNotFoundError: If the file doesn't exist
|
||||
PermissionError: If there are permission issues
|
||||
"""
|
||||
import os
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
raise IsADirectoryError(f"'{file_path}' is a directory, not a file")
|
||||
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
return file.read()
|
||||
|
||||
@@ -31,7 +48,20 @@ def read_task(args: argparse.Namespace, cli_multiline_input: bool) -> str:
|
||||
# Determine the task
|
||||
task_str = ''
|
||||
if args.file:
|
||||
task_str = read_task_from_file(args.file)
|
||||
try:
|
||||
task_str = read_task_from_file(args.file)
|
||||
except IsADirectoryError as e:
|
||||
print(f'Error: {e}')
|
||||
sys.exit(1)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File '{args.file}' not found.")
|
||||
sys.exit(1)
|
||||
except PermissionError:
|
||||
print(f"Error: Permission denied when reading '{args.file}'.")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error reading file '{args.file}': {e}")
|
||||
sys.exit(1)
|
||||
elif args.task:
|
||||
task_str = args.task
|
||||
elif not sys.stdin.isatty():
|
||||
|
||||
@@ -533,8 +533,15 @@ class CLIRuntime(Runtime):
|
||||
|
||||
file_path = self._sanitize_filename(action.path)
|
||||
|
||||
# Check if the file exists and is a directory first (before any file operations)
|
||||
if not os.path.exists(file_path):
|
||||
return ErrorObservation(f'File not found: {action.path}')
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
return ErrorObservation(f'Cannot read directory: {action.path}')
|
||||
|
||||
# Cannot read binary files
|
||||
if os.path.exists(file_path) and is_binary(file_path):
|
||||
if is_binary(file_path):
|
||||
return ErrorObservation('ERROR_BINARY_FILE')
|
||||
|
||||
# Use OHEditor for OH_ACI implementation source
|
||||
@@ -552,14 +559,6 @@ class CLIRuntime(Runtime):
|
||||
)
|
||||
|
||||
try:
|
||||
# Check if the file exists
|
||||
if not os.path.exists(file_path):
|
||||
return ErrorObservation(f'File not found: {action.path}')
|
||||
|
||||
# Check if it's a directory
|
||||
if os.path.isdir(file_path):
|
||||
return ErrorObservation(f'Cannot read directory: {action.path}')
|
||||
|
||||
# Read the file
|
||||
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
content = f.read()
|
||||
|
||||
@@ -41,16 +41,30 @@ def parse_pdf(file_path: str) -> None:
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
import os
|
||||
|
||||
print(f'[Reading PDF file from {file_path}]')
|
||||
content = PyPDF2.PdfReader(file_path)
|
||||
text = ''
|
||||
for page_idx in range(len(content.pages)):
|
||||
text += (
|
||||
f'@@ Page {page_idx + 1} @@\n'
|
||||
+ content.pages[page_idx].extract_text()
|
||||
+ '\n\n'
|
||||
)
|
||||
print(text.strip())
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
print(f'ERROR: File not found: {file_path}')
|
||||
return
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
print(f'ERROR: Cannot read directory as PDF file: {file_path}')
|
||||
return
|
||||
|
||||
try:
|
||||
content = PyPDF2.PdfReader(file_path)
|
||||
text = ''
|
||||
for page_idx in range(len(content.pages)):
|
||||
text += (
|
||||
f'@@ Page {page_idx + 1} @@\n'
|
||||
+ content.pages[page_idx].extract_text()
|
||||
+ '\n\n'
|
||||
)
|
||||
print(text.strip())
|
||||
except Exception as e:
|
||||
print(f'Error reading PDF file: {e}')
|
||||
|
||||
|
||||
def parse_docx(file_path: str) -> None:
|
||||
@@ -59,12 +73,26 @@ def parse_docx(file_path: str) -> None:
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
import os
|
||||
|
||||
print(f'[Reading DOCX file from {file_path}]')
|
||||
content = docx.Document(file_path)
|
||||
text = ''
|
||||
for i, para in enumerate(content.paragraphs):
|
||||
text += f'@@ Page {i + 1} @@\n' + para.text + '\n\n'
|
||||
print(text)
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
print(f'ERROR: File not found: {file_path}')
|
||||
return
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
print(f'ERROR: Cannot read directory as DOCX file: {file_path}')
|
||||
return
|
||||
|
||||
try:
|
||||
content = docx.Document(file_path)
|
||||
text = ''
|
||||
for i, para in enumerate(content.paragraphs):
|
||||
text += f'@@ Page {i + 1} @@\n' + para.text + '\n\n'
|
||||
print(text)
|
||||
except Exception as e:
|
||||
print(f'Error reading DOCX file: {e}')
|
||||
|
||||
|
||||
def parse_latex(file_path: str) -> None:
|
||||
@@ -73,7 +101,18 @@ def parse_latex(file_path: str) -> None:
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
import os
|
||||
|
||||
print(f'[Reading LaTex file from {file_path}]')
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
print(f'ERROR: File not found: {file_path}')
|
||||
return
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
print(f'ERROR: Cannot read directory as LaTeX file: {file_path}')
|
||||
return
|
||||
|
||||
with open(file_path) as f:
|
||||
data = f.read()
|
||||
text = LatexNodes2Text().latex_to_text(data)
|
||||
@@ -81,6 +120,14 @@ def parse_latex(file_path: str) -> None:
|
||||
|
||||
|
||||
def _base64_img(file_path: str) -> str:
|
||||
import os
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f'File not found: {file_path}')
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
raise IsADirectoryError(f'Cannot read directory as image file: {file_path}')
|
||||
|
||||
with open(file_path, 'rb') as image_file:
|
||||
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
return encoded_image
|
||||
@@ -126,7 +173,18 @@ def parse_audio(file_path: str, model: str = 'whisper-1') -> None:
|
||||
file_path: str: The path to the audio file to transcribe.
|
||||
model: str: The audio model to use for transcription. Defaults to 'whisper-1'.
|
||||
"""
|
||||
import os
|
||||
|
||||
print(f'[Transcribing audio file from {file_path}]')
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
print(f'ERROR: File not found: {file_path}')
|
||||
return
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
print(f'ERROR: Cannot read directory as audio file: {file_path}')
|
||||
return
|
||||
|
||||
try:
|
||||
# TODO: record the COST of the API call
|
||||
with open(file_path, 'rb') as audio_file:
|
||||
@@ -217,7 +275,18 @@ def parse_pptx(file_path: str) -> None:
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
import os
|
||||
|
||||
print(f'[Reading PowerPoint file from {file_path}]')
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
print(f'ERROR: File not found: {file_path}')
|
||||
return
|
||||
|
||||
if os.path.isdir(file_path):
|
||||
print(f'ERROR: Cannot read directory as PowerPoint file: {file_path}')
|
||||
return
|
||||
|
||||
try:
|
||||
pres = Presentation(str(file_path))
|
||||
text = []
|
||||
|
||||
83
tests/unit/test_cli_runtime_directory_fix.py
Normal file
83
tests/unit/test_cli_runtime_directory_fix.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Unit tests for CLI runtime directory error handling."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from openhands.events.action import FileReadAction
|
||||
from openhands.events.event import FileReadSource
|
||||
from openhands.events.observation import ErrorObservation, FileReadObservation
|
||||
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
|
||||
|
||||
|
||||
class TestCLIRuntimeDirectoryHandling:
|
||||
"""Test CLI runtime directory error handling."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test fixtures."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir) / 'test_directory'
|
||||
self.test_dir.mkdir()
|
||||
|
||||
self.test_file = Path(self.temp_dir) / 'test_file.txt'
|
||||
self.test_file.write_text('This is a test file.')
|
||||
|
||||
# Create minimal runtime instance
|
||||
self.runtime = CLIRuntime.__new__(CLIRuntime)
|
||||
self.runtime._runtime_initialized = True
|
||||
self.runtime._workspace_path = self.temp_dir
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test fixtures."""
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_read_directory_returns_error(self):
|
||||
"""Test that reading a directory returns proper error message."""
|
||||
action = FileReadAction(path=str(self.test_dir))
|
||||
result = self.runtime.read(action)
|
||||
|
||||
assert isinstance(result, ErrorObservation)
|
||||
assert 'Cannot read directory' in result.content
|
||||
assert str(self.test_dir) in result.content
|
||||
|
||||
def test_read_nonexistent_file_returns_error(self):
|
||||
"""Test that reading a non-existent file returns proper error message."""
|
||||
nonexistent_file = Path(self.temp_dir) / 'nonexistent.txt'
|
||||
action = FileReadAction(path=str(nonexistent_file))
|
||||
result = self.runtime.read(action)
|
||||
|
||||
assert isinstance(result, ErrorObservation)
|
||||
assert 'File not found' in result.content
|
||||
assert str(nonexistent_file) in result.content
|
||||
|
||||
def test_read_valid_file_succeeds(self):
|
||||
"""Test that reading a valid file works correctly."""
|
||||
action = FileReadAction(path=str(self.test_file))
|
||||
result = self.runtime.read(action)
|
||||
|
||||
assert isinstance(result, FileReadObservation)
|
||||
assert 'This is a test file' in result.content
|
||||
|
||||
def test_read_directory_with_oh_aci_returns_error(self):
|
||||
"""Test that reading a directory with OH_ACI source returns proper error message."""
|
||||
action = FileReadAction(
|
||||
path=str(self.test_dir), impl_source=FileReadSource.OH_ACI
|
||||
)
|
||||
result = self.runtime.read(action)
|
||||
|
||||
assert isinstance(result, ErrorObservation)
|
||||
assert 'Cannot read directory' in result.content
|
||||
assert str(self.test_dir) in result.content
|
||||
|
||||
def test_read_binary_file_returns_error(self):
|
||||
"""Test that reading a binary file returns proper error message."""
|
||||
# Create a binary file
|
||||
binary_file = Path(self.temp_dir) / 'test.bin'
|
||||
binary_file.write_bytes(b'\x00\x01\x02\x03')
|
||||
|
||||
action = FileReadAction(path=str(binary_file))
|
||||
result = self.runtime.read(action)
|
||||
|
||||
assert isinstance(result, ErrorObservation)
|
||||
assert 'ERROR_BINARY_FILE' in result.content
|
||||
@@ -461,6 +461,26 @@ class TestFileOperations:
|
||||
|
||||
assert result == mock_content
|
||||
|
||||
def test_read_file_directory_error(self):
|
||||
"""Test that read_file raises IsADirectoryError when trying to read a directory."""
|
||||
with patch('pathlib.Path.is_dir', return_value=True):
|
||||
try:
|
||||
read_file('/some/directory')
|
||||
raise AssertionError('Expected IsADirectoryError to be raised')
|
||||
except IsADirectoryError as e:
|
||||
assert 'is a directory, not a file' in str(e)
|
||||
assert '/some/directory' in str(e)
|
||||
|
||||
def test_read_file_regular_file(self):
|
||||
"""Test that read_file works normally for regular files."""
|
||||
mock_content = 'test file content'
|
||||
with (
|
||||
patch('pathlib.Path.is_dir', return_value=False),
|
||||
patch('builtins.open', mock_open(read_data=mock_content)),
|
||||
):
|
||||
result = read_file('test.txt')
|
||||
assert result == mock_content
|
||||
|
||||
def test_write_to_file(self):
|
||||
mock_content = 'test file content'
|
||||
mock_file = mock_open()
|
||||
|
||||
110
tests/unit/test_file_reader_directory_fix.py
Normal file
110
tests/unit/test_file_reader_directory_fix.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Unit tests for file reader directory error handling."""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from openhands.runtime.plugins.agent_skills.file_reader.file_readers import (
|
||||
_base64_img,
|
||||
parse_audio,
|
||||
parse_docx,
|
||||
parse_latex,
|
||||
parse_pdf,
|
||||
parse_pptx,
|
||||
)
|
||||
|
||||
|
||||
class TestFileReaderDirectoryHandling:
|
||||
"""Test file reader directory error handling."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test fixtures."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir) / 'test_directory'
|
||||
self.test_dir.mkdir()
|
||||
|
||||
self.test_file = Path(self.temp_dir) / 'test_file.txt'
|
||||
self.test_file.write_text('This is a test file.')
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test fixtures."""
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_parse_latex_with_directory(self, capsys):
|
||||
"""Test that parse_latex handles directory input gracefully."""
|
||||
parse_latex(str(self.test_dir))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: Cannot read directory as LaTeX file' in captured.out
|
||||
assert str(self.test_dir) in captured.out
|
||||
|
||||
def test_parse_latex_with_nonexistent_file(self, capsys):
|
||||
"""Test that parse_latex handles non-existent file gracefully."""
|
||||
nonexistent_file = Path(self.temp_dir) / 'nonexistent.tex'
|
||||
parse_latex(str(nonexistent_file))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: File not found' in captured.out
|
||||
assert str(nonexistent_file) in captured.out
|
||||
|
||||
def test_base64_img_with_directory(self):
|
||||
"""Test that _base64_img raises IsADirectoryError for directory input."""
|
||||
with pytest.raises(IsADirectoryError) as exc_info:
|
||||
_base64_img(str(self.test_dir))
|
||||
|
||||
assert 'Cannot read directory as image file' in str(exc_info.value)
|
||||
assert str(self.test_dir) in str(exc_info.value)
|
||||
|
||||
def test_base64_img_with_nonexistent_file(self):
|
||||
"""Test that _base64_img raises FileNotFoundError for non-existent file."""
|
||||
nonexistent_file = Path(self.temp_dir) / 'nonexistent.jpg'
|
||||
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
_base64_img(str(nonexistent_file))
|
||||
|
||||
assert 'File not found' in str(exc_info.value)
|
||||
assert str(nonexistent_file) in str(exc_info.value)
|
||||
|
||||
def test_parse_audio_with_directory(self, capsys):
|
||||
"""Test that parse_audio handles directory input gracefully."""
|
||||
parse_audio(str(self.test_dir))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: Cannot read directory as audio file' in captured.out
|
||||
assert str(self.test_dir) in captured.out
|
||||
|
||||
def test_parse_audio_with_nonexistent_file(self, capsys):
|
||||
"""Test that parse_audio handles non-existent file gracefully."""
|
||||
nonexistent_file = Path(self.temp_dir) / 'nonexistent.mp3'
|
||||
parse_audio(str(nonexistent_file))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: File not found' in captured.out
|
||||
assert str(nonexistent_file) in captured.out
|
||||
|
||||
def test_parse_pdf_with_directory(self, capsys):
|
||||
"""Test that parse_pdf handles directory input gracefully."""
|
||||
parse_pdf(str(self.test_dir))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: Cannot read directory as PDF file' in captured.out
|
||||
assert str(self.test_dir) in captured.out
|
||||
|
||||
def test_parse_docx_with_directory(self, capsys):
|
||||
"""Test that parse_docx handles directory input gracefully."""
|
||||
parse_docx(str(self.test_dir))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: Cannot read directory as DOCX file' in captured.out
|
||||
assert str(self.test_dir) in captured.out
|
||||
|
||||
def test_parse_pptx_with_directory(self, capsys):
|
||||
"""Test that parse_pptx handles directory input gracefully."""
|
||||
parse_pptx(str(self.test_dir))
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert 'ERROR: Cannot read directory as PowerPoint file' in captured.out
|
||||
assert str(self.test_dir) in captured.out
|
||||
@@ -1,7 +1,7 @@
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import mock_open, patch
|
||||
|
||||
from openhands.core.config import OpenHandsConfig
|
||||
from openhands.io import read_input
|
||||
from openhands.io import read_input, read_task_from_file
|
||||
|
||||
|
||||
def test_single_line_input():
|
||||
@@ -25,3 +25,26 @@ def test_multiline_input():
|
||||
with patch('builtins.input', side_effect=mock_inputs):
|
||||
result = read_input(config.cli_multiline_input)
|
||||
assert result == 'line 1\nline 2\nline 3'
|
||||
|
||||
|
||||
def test_read_task_from_file():
|
||||
"""Test that read_task_from_file works correctly for regular files"""
|
||||
mock_content = 'This is a task from a file'
|
||||
|
||||
with (
|
||||
patch('os.path.isdir', return_value=False),
|
||||
patch('builtins.open', mock_open(read_data=mock_content)),
|
||||
):
|
||||
result = read_task_from_file('task.txt')
|
||||
assert result == mock_content
|
||||
|
||||
|
||||
def test_read_task_from_file_directory_error():
|
||||
"""Test that read_task_from_file raises IsADirectoryError when trying to read a directory"""
|
||||
with patch('os.path.isdir', return_value=True):
|
||||
try:
|
||||
read_task_from_file('/some/directory')
|
||||
raise AssertionError('Expected IsADirectoryError to be raised')
|
||||
except IsADirectoryError as e:
|
||||
assert 'is a directory, not a file' in str(e)
|
||||
assert '/some/directory' in str(e)
|
||||
|
||||
Reference in New Issue
Block a user