mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
1 Commits
refactor/a
...
openhands-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e9985f19a |
@@ -14,7 +14,7 @@ OpenHands includes and adapts the following open source projects. We are gratefu
|
||||
|
||||
#### [Aider](https://github.com/paul-gauthier/aider)
|
||||
- License: Apache License 2.0
|
||||
- Description: AI pair programming tool. OpenHands has adapted and integrated its linter module for code-related tasks in [`agentskills utilities`](https://github.com/All-Hands-AI/OpenHands/tree/main/openhands/runtime/plugins/agent_skills/utils/aider)
|
||||
- Description: AI pair programming tool. OpenHands has adapted and integrated its linter module for code-related tasks
|
||||
|
||||
#### [BrowserGym](https://github.com/ServiceNow/BrowserGym)
|
||||
- License: Apache License 2.0
|
||||
|
||||
@@ -37,7 +37,6 @@ from openhands.events.observation.observation import Observation
|
||||
from openhands.events.serialization.event import truncate_content
|
||||
from openhands.llm.llm import LLM
|
||||
from openhands.runtime.plugins import (
|
||||
AgentSkillsRequirement,
|
||||
JupyterRequirement,
|
||||
PluginRequirement,
|
||||
)
|
||||
@@ -66,10 +65,6 @@ class CodeActAgent(Agent):
|
||||
"""
|
||||
|
||||
sandbox_plugins: list[PluginRequirement] = [
|
||||
# NOTE: AgentSkillsRequirement need to go before JupyterRequirement, since
|
||||
# AgentSkillsRequirement provides a lot of Python functions,
|
||||
# and it needs to be initialized before Jupyter for Jupyter to use those functions.
|
||||
AgentSkillsRequirement(),
|
||||
JupyterRequirement(),
|
||||
]
|
||||
|
||||
|
||||
@@ -501,7 +501,7 @@ def response_to_actions(response: ModelResponse) -> list[Action]:
|
||||
elif tool_call.function.name == 'edit_file':
|
||||
action = FileEditAction(**arguments)
|
||||
elif tool_call.function.name == 'str_replace_editor':
|
||||
# We implement this in agent_skills, which can be used via Jupyter
|
||||
# This is now implemented via function calling
|
||||
# convert tool_call.function.arguments to kwargs that can be passed to file_editor
|
||||
code = f'print(file_editor(**{arguments}))'
|
||||
logger.debug(
|
||||
|
||||
@@ -128,16 +128,7 @@ class ActionExecutor:
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
# This is a temporary workaround
|
||||
# TODO: refactor AgentSkills to be part of JupyterPlugin
|
||||
# AFTER ServerRuntime is deprecated
|
||||
if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
|
||||
obs = await self.run_ipython(
|
||||
IPythonRunCellAction(
|
||||
code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
|
||||
)
|
||||
)
|
||||
logger.debug(f'AgentSkills initialized: {obs}')
|
||||
|
||||
|
||||
await self._init_bash_commands()
|
||||
logger.debug('Runtime client initialized.')
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
# Requirements
|
||||
from openhands.runtime.plugins.agent_skills import (
|
||||
AgentSkillsPlugin,
|
||||
AgentSkillsRequirement,
|
||||
)
|
||||
from openhands.runtime.plugins.jupyter import JupyterPlugin, JupyterRequirement
|
||||
from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
|
||||
from openhands.runtime.plugins.vscode import VSCodePlugin, VSCodeRequirement
|
||||
@@ -10,8 +6,6 @@ from openhands.runtime.plugins.vscode import VSCodePlugin, VSCodeRequirement
|
||||
__all__ = [
|
||||
'Plugin',
|
||||
'PluginRequirement',
|
||||
'AgentSkillsRequirement',
|
||||
'AgentSkillsPlugin',
|
||||
'JupyterRequirement',
|
||||
'JupyterPlugin',
|
||||
'VSCodeRequirement',
|
||||
@@ -20,6 +14,5 @@ __all__ = [
|
||||
|
||||
ALL_PLUGINS = {
|
||||
'jupyter': JupyterPlugin,
|
||||
'agent_skills': AgentSkillsPlugin,
|
||||
'vscode': VSCodePlugin,
|
||||
}
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
# OpenHands Skill Sets
|
||||
|
||||
This folder implements a skill/tool set `agentskills` for OpenHands.
|
||||
|
||||
It is intended to be used by the agent **inside sandbox**.
|
||||
The skill set will be exposed as a `pip` package that can be installed as a plugin inside the sandbox.
|
||||
|
||||
The skill set can contain a bunch of wrapped tools for agent ([many examples here](https://github.com/All-Hands-AI/OpenHands/pull/1914)), for example:
|
||||
- Audio/Video to text (these are a temporary solution, and we should switch to multimodal models when they are sufficiently cheap
|
||||
- PDF to text
|
||||
- etc.
|
||||
|
||||
# Inclusion Criteria
|
||||
|
||||
We are walking a fine line here.
|
||||
We DON't want to *wrap* every possible python packages and re-teach agent their usage (e.g., LLM already knows `pandas` pretty well, so we don't really need create a skill that reads `csv` - it can just use `pandas`).
|
||||
|
||||
We ONLY want to add a new skill, when:
|
||||
- Such skill is not easily achievable for LLM to write code directly (e.g., edit code and replace certain line)
|
||||
- It involves calling an external model (e.g., you need to call a speech to text model, editor model for speculative editing)
|
||||
|
||||
# Intended functionality
|
||||
|
||||
- Tool/skill usage (through `IPythonRunAction`)
|
||||
|
||||
```python
|
||||
# In[1]
|
||||
from agentskills import open_file, edit_file
|
||||
open_file("/workspace/a.txt")
|
||||
# Out[1]
|
||||
[SWE-agent open output]
|
||||
|
||||
# In[2]
|
||||
edit_file(
|
||||
"/workspace/a.txt",
|
||||
start=1, end=3,
|
||||
content=(
|
||||
("REPLACE TEXT")
|
||||
))
|
||||
# Out[1]
|
||||
[SWE-agent edit output]
|
||||
```
|
||||
|
||||
- Tool/skill retrieval (through `IPythonRunAction`)
|
||||
|
||||
```python
|
||||
# In[1]
|
||||
from agentskills import help_me
|
||||
|
||||
help_me("I want to solve a task that involves reading a bunch of PDFs and reason about them")
|
||||
|
||||
# Out[1]
|
||||
"Here are the top skills that may be helpful to you:
|
||||
- `pdf_to_text`: [documentation about the tools]
|
||||
...
|
||||
"
|
||||
```
|
||||
@@ -1,14 +0,0 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from openhands.runtime.plugins.agent_skills import agentskills
|
||||
from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentSkillsRequirement(PluginRequirement):
|
||||
name: str = 'agent_skills'
|
||||
documentation: str = agentskills.DOCUMENTATION
|
||||
|
||||
|
||||
class AgentSkillsPlugin(Plugin):
|
||||
name: str = 'agent_skills'
|
||||
@@ -1,31 +0,0 @@
|
||||
from inspect import signature
|
||||
|
||||
from openhands.runtime.plugins.agent_skills import file_ops, file_reader
|
||||
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
|
||||
|
||||
import_functions(
|
||||
module=file_ops, function_names=file_ops.__all__, target_globals=globals()
|
||||
)
|
||||
import_functions(
|
||||
module=file_reader, function_names=file_reader.__all__, target_globals=globals()
|
||||
)
|
||||
__all__ = file_ops.__all__ + file_reader.__all__
|
||||
|
||||
DOCUMENTATION = ''
|
||||
for func_name in __all__:
|
||||
func = globals()[func_name]
|
||||
|
||||
cur_doc = func.__doc__
|
||||
# remove indentation from docstring and extra empty lines
|
||||
cur_doc = '\n'.join(filter(None, map(lambda x: x.strip(), cur_doc.split('\n'))))
|
||||
# now add a consistent 4 indentation
|
||||
cur_doc = '\n'.join(map(lambda x: ' ' * 4 + x, cur_doc.split('\n')))
|
||||
|
||||
fn_signature = f'{func.__name__}' + str(signature(func))
|
||||
DOCUMENTATION += f'{fn_signature}:\n{cur_doc}\n\n'
|
||||
|
||||
|
||||
# Add file_editor (a function)
|
||||
from openhands.runtime.plugins.agent_skills.file_editor import file_editor # noqa: E402
|
||||
|
||||
__all__ += ['file_editor']
|
||||
@@ -1,3 +0,0 @@
|
||||
# File Editor
|
||||
|
||||
This file editor is largely based on Anthorpic released [`str_replace_editor`](https://github.com/anthropics/anthropic-quickstarts/tree/main/computer-use-demo/computer_use_demo/tools/edit.py). The original code was released under [MIT license](https://github.com/anthropics/anthropic-quickstarts/blob/e373524f07594d48c3f9563248ea282a4c306c0c/LICENSE).
|
||||
@@ -1,8 +0,0 @@
|
||||
"""This file imports a global singleton of the `EditTool` class as well as raw functions that expose
|
||||
its __call__.
|
||||
The implementation of the `EditTool` class can be found at: https://github.com/All-Hands-AI/openhands-aci/.
|
||||
"""
|
||||
|
||||
from openhands_aci.editor import file_editor
|
||||
|
||||
__all__ = ['file_editor']
|
||||
@@ -1,7 +0,0 @@
|
||||
from openhands.runtime.plugins.agent_skills.file_ops import file_ops
|
||||
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
|
||||
|
||||
import_functions(
|
||||
module=file_ops, function_names=file_ops.__all__, target_globals=globals()
|
||||
)
|
||||
__all__ = file_ops.__all__
|
||||
@@ -1,381 +0,0 @@
|
||||
"""file_ops.py
|
||||
|
||||
This module provides various file manipulation skills for the OpenHands agent.
|
||||
|
||||
Functions:
|
||||
- open_file(path: str, line_number: int | None = 1, context_lines: int = 100): Opens a file and optionally moves to a specific line.
|
||||
- goto_line(line_number: int): Moves the window to show the specified line number.
|
||||
- scroll_down(): Moves the window down by the number of lines specified in WINDOW.
|
||||
- scroll_up(): Moves the window up by the number of lines specified in WINDOW.
|
||||
- search_dir(search_term: str, dir_path: str = './'): Searches for a term in all files in the specified directory.
|
||||
- search_file(search_term: str, file_path: str | None = None): Searches for a term in the specified file or the currently open file.
|
||||
- find_file(file_name: str, dir_path: str = './'): Finds all files with the given name in the specified directory.
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from openhands.linter import DefaultLinter, LintResult
|
||||
|
||||
CURRENT_FILE: str | None = None
|
||||
CURRENT_LINE = 1
|
||||
WINDOW = 100
|
||||
|
||||
# This is also used in unit tests!
|
||||
MSG_FILE_UPDATED = '[File updated (edited at line {line_number}). Please review the changes and make sure they are correct (correct indentation, no duplicate lines, etc). Edit the file again if necessary.]'
|
||||
LINTER_ERROR_MSG = '[Your proposed edit has introduced new syntax error(s). Please understand the errors and retry your edit command.]\n'
|
||||
|
||||
|
||||
# ==================================================================================================
|
||||
|
||||
|
||||
def _output_error(error_msg: str) -> bool:
|
||||
print(f'ERROR: {error_msg}')
|
||||
return False
|
||||
|
||||
|
||||
def _is_valid_filename(file_name) -> bool:
|
||||
if not file_name or not isinstance(file_name, str) or not file_name.strip():
|
||||
return False
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
if os.name == 'nt': # Windows
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
elif os.name == 'posix': # Unix-like systems
|
||||
invalid_chars = '\0'
|
||||
|
||||
for char in invalid_chars:
|
||||
if char in file_name:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _is_valid_path(path) -> bool:
|
||||
if not path or not isinstance(path, str):
|
||||
return False
|
||||
try:
|
||||
return os.path.exists(os.path.normpath(path))
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
|
||||
def _create_paths(file_name) -> bool:
|
||||
try:
|
||||
dirname = os.path.dirname(file_name)
|
||||
if dirname:
|
||||
os.makedirs(dirname, exist_ok=True)
|
||||
return True
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
|
||||
def _check_current_file(file_path: str | None = None) -> bool:
|
||||
global CURRENT_FILE
|
||||
if not file_path:
|
||||
file_path = CURRENT_FILE
|
||||
if not file_path or not os.path.isfile(file_path):
|
||||
return _output_error('No file open. Use the open_file function first.')
|
||||
return True
|
||||
|
||||
|
||||
def _clamp(value, min_value, max_value):
|
||||
return max(min_value, min(value, max_value))
|
||||
|
||||
|
||||
def _lint_file(file_path: str) -> tuple[str | None, int | None]:
|
||||
"""Lint the file at the given path and return a tuple with a boolean indicating if there are errors,
|
||||
and the line number of the first error, if any.
|
||||
|
||||
Returns:
|
||||
tuple[str | None, int | None]: (lint_error, first_error_line_number)
|
||||
"""
|
||||
linter = DefaultLinter()
|
||||
lint_error: list[LintResult] = linter.lint(file_path)
|
||||
if not lint_error:
|
||||
# Linting successful. No issues found.
|
||||
return None, None
|
||||
first_error_line = lint_error[0].line if len(lint_error) > 0 else None
|
||||
error_text = 'ERRORS:\n' + '\n'.join(
|
||||
[f'{file_path}:{err.line}:{err.column}: {err.message}' for err in lint_error]
|
||||
)
|
||||
return error_text, first_error_line
|
||||
|
||||
|
||||
def _print_window(
|
||||
file_path, targeted_line, window, return_str=False, ignore_window=False
|
||||
):
|
||||
global CURRENT_LINE
|
||||
_check_current_file(file_path)
|
||||
with open(file_path) as file:
|
||||
content = file.read()
|
||||
|
||||
# Ensure the content ends with a newline character
|
||||
if not content.endswith('\n'):
|
||||
content += '\n'
|
||||
|
||||
lines = content.splitlines(True) # Keep all line ending characters
|
||||
total_lines = len(lines)
|
||||
|
||||
# cover edge cases
|
||||
CURRENT_LINE = _clamp(targeted_line, 1, total_lines)
|
||||
half_window = max(1, window // 2)
|
||||
if ignore_window:
|
||||
# Use CURRENT_LINE as starting line (for e.g. scroll_down)
|
||||
start = max(1, CURRENT_LINE)
|
||||
end = min(total_lines, CURRENT_LINE + window)
|
||||
else:
|
||||
# Ensure at least one line above and below the targeted line
|
||||
start = max(1, CURRENT_LINE - half_window)
|
||||
end = min(total_lines, CURRENT_LINE + half_window)
|
||||
|
||||
# Adjust start and end to ensure at least one line above and below
|
||||
if start == 1:
|
||||
end = min(total_lines, start + window - 1)
|
||||
if end == total_lines:
|
||||
start = max(1, end - window + 1)
|
||||
|
||||
output = ''
|
||||
|
||||
# only display this when there's at least one line above
|
||||
if start > 1:
|
||||
output += f'({start - 1} more lines above)\n'
|
||||
else:
|
||||
output += '(this is the beginning of the file)\n'
|
||||
for i in range(start, end + 1):
|
||||
_new_line = f'{i}|{lines[i-1]}'
|
||||
if not _new_line.endswith('\n'):
|
||||
_new_line += '\n'
|
||||
output += _new_line
|
||||
if end < total_lines:
|
||||
output += f'({total_lines - end} more lines below)\n'
|
||||
else:
|
||||
output += '(this is the end of the file)\n'
|
||||
output = output.rstrip()
|
||||
|
||||
if return_str:
|
||||
return output
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
def _cur_file_header(current_file, total_lines) -> str:
|
||||
if not current_file:
|
||||
return ''
|
||||
return f'[File: {os.path.abspath(current_file)} ({total_lines} lines total)]\n'
|
||||
|
||||
|
||||
def open_file(
|
||||
path: str, line_number: int | None = 1, context_lines: int | None = WINDOW
|
||||
) -> None:
|
||||
"""Opens the file at the given path in the editor. IF the file is to be edited, first use `scroll_down` repeatedly to read the full file!
|
||||
If line_number is provided, the window will be moved to include that line.
|
||||
It only shows the first 100 lines by default! `context_lines` is the max number of lines to be displayed, up to 100. Use `scroll_up` and `scroll_down` to view more content up or down.
|
||||
|
||||
Args:
|
||||
path: str: The path to the file to open, preferred absolute path.
|
||||
line_number: int | None = 1: The line number to move to. Defaults to 1.
|
||||
context_lines: int | None = 100: Only shows this number of lines in the context window (usually from line 1), with line_number as the center (if possible). Defaults to 100.
|
||||
"""
|
||||
global CURRENT_FILE, CURRENT_LINE, WINDOW
|
||||
|
||||
if not os.path.isfile(path):
|
||||
_output_error(f'File {path} not found.')
|
||||
return
|
||||
|
||||
CURRENT_FILE = os.path.abspath(path)
|
||||
with open(CURRENT_FILE) as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
|
||||
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
|
||||
_output_error(f'Line number must be between 1 and {total_lines}')
|
||||
return
|
||||
CURRENT_LINE = line_number
|
||||
|
||||
# Override WINDOW with context_lines
|
||||
if context_lines is None or context_lines < 1:
|
||||
context_lines = WINDOW
|
||||
|
||||
output = _cur_file_header(CURRENT_FILE, total_lines)
|
||||
output += _print_window(
|
||||
CURRENT_FILE,
|
||||
CURRENT_LINE,
|
||||
_clamp(context_lines, 1, 100),
|
||||
return_str=True,
|
||||
ignore_window=False,
|
||||
)
|
||||
if output.strip().endswith('more lines below)'):
|
||||
output += '\n[Use `scroll_down` to view the next 100 lines of the file!]'
|
||||
print(output)
|
||||
|
||||
|
||||
def goto_line(line_number: int) -> None:
|
||||
"""Moves the window to show the specified line number.
|
||||
|
||||
Args:
|
||||
line_number: int: The line number to move to.
|
||||
"""
|
||||
global CURRENT_FILE, CURRENT_LINE, WINDOW
|
||||
_check_current_file()
|
||||
|
||||
with open(str(CURRENT_FILE)) as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
|
||||
_output_error(f'Line number must be between 1 and {total_lines}.')
|
||||
return
|
||||
|
||||
CURRENT_LINE = _clamp(line_number, 1, total_lines)
|
||||
|
||||
output = _cur_file_header(CURRENT_FILE, total_lines)
|
||||
output += _print_window(
|
||||
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=False
|
||||
)
|
||||
print(output)
|
||||
|
||||
|
||||
def scroll_down() -> None:
|
||||
"""Moves the window down by 100 lines.
|
||||
|
||||
Args:
|
||||
None
|
||||
"""
|
||||
global CURRENT_FILE, CURRENT_LINE, WINDOW
|
||||
_check_current_file()
|
||||
|
||||
with open(str(CURRENT_FILE)) as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
CURRENT_LINE = _clamp(CURRENT_LINE + WINDOW, 1, total_lines)
|
||||
output = _cur_file_header(CURRENT_FILE, total_lines)
|
||||
output += _print_window(
|
||||
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=True
|
||||
)
|
||||
print(output)
|
||||
|
||||
|
||||
def scroll_up() -> None:
|
||||
"""Moves the window up by 100 lines.
|
||||
|
||||
Args:
|
||||
None
|
||||
"""
|
||||
global CURRENT_FILE, CURRENT_LINE, WINDOW
|
||||
_check_current_file()
|
||||
|
||||
with open(str(CURRENT_FILE)) as file:
|
||||
total_lines = max(1, sum(1 for _ in file))
|
||||
CURRENT_LINE = _clamp(CURRENT_LINE - WINDOW, 1, total_lines)
|
||||
output = _cur_file_header(CURRENT_FILE, total_lines)
|
||||
output += _print_window(
|
||||
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=True
|
||||
)
|
||||
print(output)
|
||||
|
||||
|
||||
class LineNumberError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def search_dir(search_term: str, dir_path: str = './') -> None:
|
||||
"""Searches for search_term in all files in dir. If dir is not provided, searches in the current directory.
|
||||
|
||||
Args:
|
||||
search_term: str: The term to search for.
|
||||
dir_path: str: The path to the directory to search.
|
||||
"""
|
||||
if not os.path.isdir(dir_path):
|
||||
_output_error(f'Directory {dir_path} not found')
|
||||
return
|
||||
matches = []
|
||||
for root, _, files in os.walk(dir_path):
|
||||
for file in files:
|
||||
if file.startswith('.'):
|
||||
continue
|
||||
file_path = os.path.join(root, file)
|
||||
with open(file_path, 'r', errors='ignore') as f:
|
||||
for line_num, line in enumerate(f, 1):
|
||||
if search_term in line:
|
||||
matches.append((file_path, line_num, line.strip()))
|
||||
|
||||
if not matches:
|
||||
print(f'No matches found for "{search_term}" in {dir_path}')
|
||||
return
|
||||
|
||||
num_matches = len(matches)
|
||||
num_files = len(set(match[0] for match in matches))
|
||||
|
||||
if num_files > 100:
|
||||
print(
|
||||
f'More than {num_files} files matched for "{search_term}" in {dir_path}. Please narrow your search.'
|
||||
)
|
||||
return
|
||||
|
||||
print(f'[Found {num_matches} matches for "{search_term}" in {dir_path}]')
|
||||
for file_path, line_num, line in matches:
|
||||
print(f'{file_path} (Line {line_num}): {line}')
|
||||
print(f'[End of matches for "{search_term}" in {dir_path}]')
|
||||
|
||||
|
||||
def search_file(search_term: str, file_path: str | None = None) -> None:
|
||||
"""Searches for search_term in file. If file is not provided, searches in the current open file.
|
||||
|
||||
Args:
|
||||
search_term: str: The term to search for.
|
||||
file_path: str | None: The path to the file to search.
|
||||
"""
|
||||
global CURRENT_FILE
|
||||
if file_path is None:
|
||||
file_path = CURRENT_FILE
|
||||
if file_path is None:
|
||||
_output_error('No file specified or open. Use the open_file function first.')
|
||||
return
|
||||
if not os.path.isfile(file_path):
|
||||
_output_error(f'File {file_path} not found.')
|
||||
return
|
||||
|
||||
matches = []
|
||||
with open(file_path) as file:
|
||||
for i, line in enumerate(file, 1):
|
||||
if search_term in line:
|
||||
matches.append((i, line.strip()))
|
||||
|
||||
if matches:
|
||||
print(f'[Found {len(matches)} matches for "{search_term}" in {file_path}]')
|
||||
for match in matches:
|
||||
print(f'Line {match[0]}: {match[1]}')
|
||||
print(f'[End of matches for "{search_term}" in {file_path}]')
|
||||
else:
|
||||
print(f'[No matches found for "{search_term}" in {file_path}]')
|
||||
|
||||
|
||||
def find_file(file_name: str, dir_path: str = './') -> None:
|
||||
"""Finds all files with the given name in the specified directory.
|
||||
|
||||
Args:
|
||||
file_name: str: The name of the file to find.
|
||||
dir_path: str: The path to the directory to search.
|
||||
"""
|
||||
if not os.path.isdir(dir_path):
|
||||
_output_error(f'Directory {dir_path} not found')
|
||||
return
|
||||
|
||||
matches = []
|
||||
for root, _, files in os.walk(dir_path):
|
||||
for file in files:
|
||||
if file_name in file:
|
||||
matches.append(os.path.join(root, file))
|
||||
|
||||
if matches:
|
||||
print(f'[Found {len(matches)} matches for "{file_name}" in {dir_path}]')
|
||||
for match in matches:
|
||||
print(f'{match}')
|
||||
print(f'[End of matches for "{file_name}" in {dir_path}]')
|
||||
else:
|
||||
print(f'[No matches found for "{file_name}" in {dir_path}]')
|
||||
|
||||
|
||||
__all__ = [
|
||||
'open_file',
|
||||
'goto_line',
|
||||
'scroll_down',
|
||||
'scroll_up',
|
||||
'search_dir',
|
||||
'search_file',
|
||||
'find_file',
|
||||
]
|
||||
@@ -1,7 +0,0 @@
|
||||
from openhands.runtime.plugins.agent_skills.file_reader import file_readers
|
||||
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
|
||||
|
||||
import_functions(
|
||||
module=file_readers, function_names=file_readers.__all__, target_globals=globals()
|
||||
)
|
||||
__all__ = file_readers.__all__
|
||||
@@ -1,244 +0,0 @@
|
||||
"""File reader skills for the OpenHands agent.
|
||||
|
||||
This module provides various functions to parse and extract content from different file types,
|
||||
including PDF, DOCX, LaTeX, audio, image, video, and PowerPoint files. It utilizes different
|
||||
libraries and APIs to process these files and output their content or descriptions.
|
||||
|
||||
Functions:
|
||||
parse_pdf(file_path: str) -> None: Parse and print content of a PDF file.
|
||||
parse_docx(file_path: str) -> None: Parse and print content of a DOCX file.
|
||||
parse_latex(file_path: str) -> None: Parse and print content of a LaTeX file.
|
||||
parse_audio(file_path: str, model: str = 'whisper-1') -> None: Transcribe and print content of an audio file.
|
||||
parse_image(file_path: str, task: str = 'Describe this image as detail as possible.') -> None: Analyze and print description of an image file.
|
||||
parse_video(file_path: str, task: str = 'Describe this image as detail as possible.', frame_interval: int = 30) -> None: Analyze and print description of video frames.
|
||||
parse_pptx(file_path: str) -> None: Parse and print content of a PowerPoint file.
|
||||
|
||||
Note:
|
||||
Some functions (parse_audio, parse_video, parse_image) require OpenAI API credentials
|
||||
and are only available if the necessary environment variables are set.
|
||||
"""
|
||||
|
||||
import base64
|
||||
|
||||
import docx
|
||||
import PyPDF2
|
||||
from pptx import Presentation
|
||||
from pylatexenc.latex2text import LatexNodes2Text
|
||||
|
||||
from openhands.runtime.plugins.agent_skills.utils.config import (
|
||||
_get_max_token,
|
||||
_get_openai_api_key,
|
||||
_get_openai_base_url,
|
||||
_get_openai_client,
|
||||
_get_openai_model,
|
||||
)
|
||||
|
||||
|
||||
def parse_pdf(file_path: str) -> None:
|
||||
"""Parses the content of a PDF file and prints it.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
print(f'[Reading PDF file from {file_path}]')
|
||||
content = PyPDF2.PdfReader(file_path)
|
||||
text = ''
|
||||
for page_idx in range(len(content.pages)):
|
||||
text += (
|
||||
f'@@ Page {page_idx + 1} @@\n'
|
||||
+ content.pages[page_idx].extract_text()
|
||||
+ '\n\n'
|
||||
)
|
||||
print(text.strip())
|
||||
|
||||
|
||||
def parse_docx(file_path: str) -> None:
|
||||
"""Parses the content of a DOCX file and prints it.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
print(f'[Reading DOCX file from {file_path}]')
|
||||
content = docx.Document(file_path)
|
||||
text = ''
|
||||
for i, para in enumerate(content.paragraphs):
|
||||
text += f'@@ Page {i + 1} @@\n' + para.text + '\n\n'
|
||||
print(text)
|
||||
|
||||
|
||||
def parse_latex(file_path: str) -> None:
|
||||
"""Parses the content of a LaTex file and prints it.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
print(f'[Reading LaTex file from {file_path}]')
|
||||
with open(file_path) as f:
|
||||
data = f.read()
|
||||
text = LatexNodes2Text().latex_to_text(data)
|
||||
print(text.strip())
|
||||
|
||||
|
||||
def _base64_img(file_path: str) -> str:
|
||||
with open(file_path, 'rb') as image_file:
|
||||
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
return encoded_image
|
||||
|
||||
|
||||
def _base64_video(file_path: str, frame_interval: int = 10) -> list[str]:
|
||||
import cv2
|
||||
|
||||
video = cv2.VideoCapture(file_path)
|
||||
base64_frames = []
|
||||
frame_count = 0
|
||||
while video.isOpened():
|
||||
success, frame = video.read()
|
||||
if not success:
|
||||
break
|
||||
if frame_count % frame_interval == 0:
|
||||
_, buffer = cv2.imencode('.jpg', frame)
|
||||
base64_frames.append(base64.b64encode(buffer).decode('utf-8'))
|
||||
frame_count += 1
|
||||
video.release()
|
||||
return base64_frames
|
||||
|
||||
|
||||
def _prepare_image_messages(task: str, base64_image: str):
|
||||
return [
|
||||
{
|
||||
'role': 'user',
|
||||
'content': [
|
||||
{'type': 'text', 'text': task},
|
||||
{
|
||||
'type': 'image_url',
|
||||
'image_url': {'url': f'data:image/jpeg;base64,{base64_image}'},
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def parse_audio(file_path: str, model: str = 'whisper-1') -> None:
|
||||
"""Parses the content of an audio file and prints it.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the audio file to transcribe.
|
||||
model: str: The audio model to use for transcription. Defaults to 'whisper-1'.
|
||||
"""
|
||||
print(f'[Transcribing audio file from {file_path}]')
|
||||
try:
|
||||
# TODO: record the COST of the API call
|
||||
with open(file_path, 'rb') as audio_file:
|
||||
transcript = _get_openai_client().audio.translations.create(
|
||||
model=model, file=audio_file
|
||||
)
|
||||
print(transcript.text)
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error transcribing audio file: {e}')
|
||||
|
||||
|
||||
def parse_image(
|
||||
file_path: str, task: str = 'Describe this image as detail as possible.'
|
||||
) -> None:
|
||||
"""Parses the content of an image file and prints the description.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'.
|
||||
"""
|
||||
print(f'[Reading image file from {file_path}]')
|
||||
# TODO: record the COST of the API call
|
||||
try:
|
||||
base64_image = _base64_img(file_path)
|
||||
response = _get_openai_client().chat.completions.create(
|
||||
model=_get_openai_model(),
|
||||
messages=_prepare_image_messages(task, base64_image),
|
||||
max_tokens=_get_max_token(),
|
||||
)
|
||||
content = response.choices[0].message.content
|
||||
print(content)
|
||||
|
||||
except Exception as error:
|
||||
print(f'Error with the request: {error}')
|
||||
|
||||
|
||||
def parse_video(
|
||||
file_path: str,
|
||||
task: str = 'Describe this image as detail as possible.',
|
||||
frame_interval: int = 30,
|
||||
) -> None:
|
||||
"""Parses the content of an image file and prints the description.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the video file to open.
|
||||
task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'.
|
||||
frame_interval: int: The interval between frames to analyze. Defaults to 30.
|
||||
|
||||
"""
|
||||
print(
|
||||
f'[Processing video file from {file_path} with frame interval {frame_interval}]'
|
||||
)
|
||||
|
||||
task = task or 'This is one frame from a video, please summarize this frame.'
|
||||
base64_frames = _base64_video(file_path)
|
||||
selected_frames = base64_frames[::frame_interval]
|
||||
|
||||
if len(selected_frames) > 30:
|
||||
new_interval = len(base64_frames) // 30
|
||||
selected_frames = base64_frames[::new_interval]
|
||||
|
||||
print(f'Totally {len(selected_frames)} would be analyze...\n')
|
||||
|
||||
idx = 0
|
||||
for base64_frame in selected_frames:
|
||||
idx += 1
|
||||
print(f'Process the {file_path}, current No. {idx * frame_interval} frame...')
|
||||
# TODO: record the COST of the API call
|
||||
try:
|
||||
response = _get_openai_client().chat.completions.create(
|
||||
model=_get_openai_model(),
|
||||
messages=_prepare_image_messages(task, base64_frame),
|
||||
max_tokens=_get_max_token(),
|
||||
)
|
||||
|
||||
content = response.choices[0].message.content
|
||||
current_frame_content = f"Frame {idx}'s content: {content}\n"
|
||||
print(current_frame_content)
|
||||
|
||||
except Exception as error:
|
||||
print(f'Error with the request: {error}')
|
||||
|
||||
|
||||
def parse_pptx(file_path: str) -> None:
|
||||
"""Parses the content of a pptx file and prints it.
|
||||
|
||||
Args:
|
||||
file_path: str: The path to the file to open.
|
||||
"""
|
||||
print(f'[Reading PowerPoint file from {file_path}]')
|
||||
try:
|
||||
pres = Presentation(str(file_path))
|
||||
text = []
|
||||
for slide_idx, slide in enumerate(pres.slides):
|
||||
text.append(f'@@ Slide {slide_idx + 1} @@')
|
||||
for shape in slide.shapes:
|
||||
if hasattr(shape, 'text'):
|
||||
text.append(shape.text)
|
||||
print('\n'.join(text))
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error reading PowerPoint file: {e}')
|
||||
|
||||
|
||||
__all__ = [
|
||||
'parse_pdf',
|
||||
'parse_docx',
|
||||
'parse_latex',
|
||||
'parse_pptx',
|
||||
]
|
||||
|
||||
# This is called from OpenHands's side
|
||||
# If SANDBOX_ENV_OPENAI_API_KEY is set, we will be able to use these tools in the sandbox environment
|
||||
if _get_openai_api_key() and _get_openai_base_url():
|
||||
__all__ += ['parse_audio', 'parse_video', 'parse_image']
|
||||
@@ -1,30 +0,0 @@
|
||||
import os
|
||||
|
||||
from openai import OpenAI
|
||||
|
||||
|
||||
# ==================================================================================================
|
||||
# OPENAI
|
||||
# TODO: Move this to EventStream Actions when DockerRuntime is fully implemented
|
||||
# NOTE: we need to get env vars inside functions because they will be set in IPython
|
||||
# AFTER the agentskills is imported (the case for DockerRuntime)
|
||||
# ==================================================================================================
|
||||
def _get_openai_api_key():
|
||||
return os.getenv('OPENAI_API_KEY', os.getenv('SANDBOX_ENV_OPENAI_API_KEY', ''))
|
||||
|
||||
|
||||
def _get_openai_base_url():
|
||||
return os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1')
|
||||
|
||||
|
||||
def _get_openai_model():
|
||||
return os.getenv('OPENAI_MODEL', 'gpt-4o')
|
||||
|
||||
|
||||
def _get_max_token():
|
||||
return os.getenv('MAX_TOKEN', 500)
|
||||
|
||||
|
||||
def _get_openai_client():
|
||||
client = OpenAI(api_key=_get_openai_api_key(), base_url=_get_openai_base_url())
|
||||
return client
|
||||
@@ -1,11 +0,0 @@
|
||||
from types import ModuleType
|
||||
|
||||
|
||||
def import_functions(
|
||||
module: ModuleType, function_names: list[str], target_globals: dict
|
||||
) -> None:
|
||||
for name in function_names:
|
||||
if hasattr(module, name):
|
||||
target_globals[name] = getattr(module, name)
|
||||
else:
|
||||
raise ValueError(f'Function {name} not found in {module.__name__}')
|
||||
@@ -49,10 +49,7 @@ bashlex = "^0.18"
|
||||
pyjwt = "^2.9.0"
|
||||
dirhash = "*"
|
||||
python-frontmatter = "^1.1.0"
|
||||
python-docx = "*"
|
||||
PyPDF2 = "*"
|
||||
python-pptx = "*"
|
||||
pylatexenc = "*"
|
||||
|
||||
tornado = "*"
|
||||
python-dotenv = "*"
|
||||
pylcs = "^0.1.1"
|
||||
|
||||
@@ -1,717 +0,0 @@
|
||||
import contextlib
|
||||
import io
|
||||
import sys
|
||||
|
||||
import docx
|
||||
import pytest
|
||||
|
||||
from openhands.runtime.plugins.agent_skills.file_ops.file_ops import (
|
||||
WINDOW,
|
||||
_print_window,
|
||||
find_file,
|
||||
goto_line,
|
||||
open_file,
|
||||
scroll_down,
|
||||
scroll_up,
|
||||
search_dir,
|
||||
search_file,
|
||||
)
|
||||
from openhands.runtime.plugins.agent_skills.file_reader.file_readers import (
|
||||
parse_docx,
|
||||
parse_latex,
|
||||
parse_pdf,
|
||||
parse_pptx,
|
||||
)
|
||||
|
||||
|
||||
# CURRENT_FILE must be reset for each test
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_current_file():
|
||||
from openhands.runtime.plugins.agent_skills import agentskills
|
||||
|
||||
agentskills.CURRENT_FILE = None
|
||||
|
||||
|
||||
def _numbered_test_lines(start, end) -> str:
|
||||
return ('\n'.join(f'{i}|' for i in range(start, end + 1))) + '\n'
|
||||
|
||||
|
||||
def _generate_test_file_with_lines(temp_path, num_lines) -> str:
|
||||
file_path = temp_path / 'test_file.py'
|
||||
file_path.write_text('\n' * num_lines)
|
||||
return file_path
|
||||
|
||||
|
||||
def _generate_ruby_test_file_with_lines(temp_path, num_lines) -> str:
|
||||
file_path = temp_path / 'test_file.rb'
|
||||
file_path.write_text('\n' * num_lines)
|
||||
return file_path
|
||||
|
||||
|
||||
def _calculate_window_bounds(current_line, total_lines, window_size):
|
||||
"""Calculate the bounds of the window around the current line."""
|
||||
half_window = window_size // 2
|
||||
if current_line - half_window < 0:
|
||||
start = 1
|
||||
end = window_size
|
||||
else:
|
||||
start = current_line - half_window
|
||||
end = current_line + half_window
|
||||
return start, end
|
||||
|
||||
|
||||
def _capture_file_operation_error(operation, expected_error_msg):
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
operation()
|
||||
result = buf.getvalue().strip()
|
||||
assert result == expected_error_msg
|
||||
|
||||
|
||||
SEP = '-' * 49 + '\n'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def test_open_file_unexist_path():
|
||||
_capture_file_operation_error(
|
||||
lambda: open_file('/unexist/path/a.txt'),
|
||||
'ERROR: File /unexist/path/a.txt not found.',
|
||||
)
|
||||
|
||||
|
||||
def test_open_file(tmp_path):
|
||||
assert tmp_path is not None
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = (
|
||||
f'[File: {temp_file_path} (5 lines total)]\n'
|
||||
'(this is the beginning of the file)\n'
|
||||
'1|Line 1\n'
|
||||
'2|Line 2\n'
|
||||
'3|Line 3\n'
|
||||
'4|Line 4\n'
|
||||
'5|Line 5\n'
|
||||
'(this is the end of the file)\n'
|
||||
)
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_open_file_with_indentation(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\n Line 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = (
|
||||
f'[File: {temp_file_path} (5 lines total)]\n'
|
||||
'(this is the beginning of the file)\n'
|
||||
'1|Line 1\n'
|
||||
'2| Line 2\n'
|
||||
'3|Line 3\n'
|
||||
'4|Line 4\n'
|
||||
'5|Line 5\n'
|
||||
'(this is the end of the file)\n'
|
||||
)
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_open_file_long(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, 1001)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path), 1, 50)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = f'[File: {temp_file_path} (1000 lines total)]\n'
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
for i in range(1, 51):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
expected += '(950 more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_open_file_long_with_lineno(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, 1001)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
cur_line = 100
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path), cur_line)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = f'[File: {temp_file_path} (1000 lines total)]\n'
|
||||
# since 100 is < WINDOW and 100 - WINDOW//2 < 0, so it should show all lines from 1 to WINDOW
|
||||
|
||||
start, end = _calculate_window_bounds(cur_line, 1000, WINDOW)
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == 1000:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({1000 - end} more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_goto_line(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
total_lines = 1000
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
for i in range(1, WINDOW + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
expected += f'({total_lines - WINDOW} more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
goto_line(500)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
cur_line = 500
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == total_lines:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({total_lines - end} more lines below)\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_goto_line_negative(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, 5)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
|
||||
_capture_file_operation_error(
|
||||
lambda: goto_line(-1), 'ERROR: Line number must be between 1 and 4.'
|
||||
)
|
||||
|
||||
|
||||
def test_goto_line_out_of_bound(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, 10)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
|
||||
_capture_file_operation_error(
|
||||
lambda: goto_line(100), 'ERROR: Line number must be between 1 and 9.'
|
||||
)
|
||||
|
||||
|
||||
def test_scroll_down(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
total_lines = 1000
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
start, end = _calculate_window_bounds(1, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == total_lines:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({total_lines - end} more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
scroll_down()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
start = WINDOW + 1
|
||||
end = 2 * WINDOW + 1
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == total_lines:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({total_lines - end} more lines below)\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_scroll_up(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
total_lines = 1000
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
cur_line = 300
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path), cur_line)
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == total_lines:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({total_lines - end} more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
scroll_up()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
cur_line = cur_line - WINDOW
|
||||
|
||||
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
|
||||
start = cur_line
|
||||
end = cur_line + WINDOW
|
||||
|
||||
if start == 1:
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
else:
|
||||
expected += f'({start - 1} more lines above)\n'
|
||||
for i in range(start, end + 1):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
if end == total_lines:
|
||||
expected += '(this is the end of the file)\n'
|
||||
else:
|
||||
expected += f'({total_lines - end} more lines below)\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_scroll_down_edge(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
content = '\n'.join([f'Line {i}' for i in range(1, 10)])
|
||||
temp_file_path.write_text(content)
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
open_file(str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[File: {temp_file_path} (9 lines total)]\n'
|
||||
expected += '(this is the beginning of the file)\n'
|
||||
for i in range(1, 10):
|
||||
expected += f'{i}|Line {i}\n'
|
||||
expected += '(this is the end of the file)\n'
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
scroll_down()
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
# expected should be unchanged
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_print_window_internal(tmp_path):
|
||||
test_file_path = tmp_path / 'a.txt'
|
||||
test_file_path.write_text('')
|
||||
open_file(str(test_file_path))
|
||||
with open(test_file_path, 'w') as file:
|
||||
for i in range(1, 101):
|
||||
file.write(f'Line `{i}`\n')
|
||||
|
||||
# Define the parameters for the test
|
||||
current_line = 50
|
||||
window = 2
|
||||
|
||||
# Test _print_window especially with backticks
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
_print_window(str(test_file_path), current_line, window, return_str=False)
|
||||
result = buf.getvalue()
|
||||
expected = (
|
||||
'(48 more lines above)\n'
|
||||
'49|Line `49`\n'
|
||||
'50|Line `50`\n'
|
||||
'51|Line `51`\n'
|
||||
'(49 more lines below)\n'
|
||||
)
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_open_file_large_line_number(tmp_path):
|
||||
test_file_path = tmp_path / 'a.txt'
|
||||
test_file_path.write_text('')
|
||||
open_file(str(test_file_path))
|
||||
with open(test_file_path, 'w') as file:
|
||||
for i in range(1, 1000):
|
||||
file.write(f'Line `{i}`\n')
|
||||
|
||||
# Define the parameters for the test
|
||||
current_line = 800
|
||||
window = 100
|
||||
|
||||
# Test _print_window especially with backticks
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
# _print_window(str(test_file_path), current_line, window, return_str=False)
|
||||
open_file(str(test_file_path), current_line, window)
|
||||
result = buf.getvalue()
|
||||
expected = f'[File: {test_file_path} (999 lines total)]\n'
|
||||
expected += '(749 more lines above)\n'
|
||||
for i in range(750, 850 + 1):
|
||||
expected += f'{i}|Line `{i}`\n'
|
||||
expected += '(149 more lines below)\n'
|
||||
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_search_dir(tmp_path):
|
||||
# create files with the search term "bingo"
|
||||
for i in range(1, 101):
|
||||
temp_file_path = tmp_path / f'a{i}.txt'
|
||||
with open(temp_file_path, 'w') as file:
|
||||
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
|
||||
if i == 50:
|
||||
file.write('bingo')
|
||||
|
||||
# test
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_dir('bingo', str(tmp_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = (
|
||||
f'[Found 1 matches for "bingo" in {tmp_path}]\n'
|
||||
f'{tmp_path}/a50.txt (Line 6): bingo\n'
|
||||
f'[End of matches for "bingo" in {tmp_path}]\n'
|
||||
)
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_dir_not_exist_term(tmp_path):
|
||||
# create files with the search term "bingo"
|
||||
for i in range(1, 101):
|
||||
temp_file_path = tmp_path / f'a{i}.txt'
|
||||
with open(temp_file_path, 'w') as file:
|
||||
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
|
||||
|
||||
# test
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_dir('non-exist', str(tmp_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'No matches found for "non-exist" in {tmp_path}\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_dir_too_much_match(tmp_path):
|
||||
# create files with the search term "Line 5"
|
||||
for i in range(1, 1000):
|
||||
temp_file_path = tmp_path / f'a{i}.txt'
|
||||
with open(temp_file_path, 'w') as file:
|
||||
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_dir('Line 5', str(tmp_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'More than 999 files matched for "Line 5" in {tmp_path}. Please narrow your search.\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_dir_cwd(tmp_path, monkeypatch):
|
||||
# Using pytest's monkeypatch to change directory without affecting other tests
|
||||
monkeypatch.chdir(tmp_path)
|
||||
# create files with the search term "bingo"
|
||||
for i in range(1, 101):
|
||||
temp_file_path = tmp_path / f'a{i}.txt'
|
||||
with open(temp_file_path, 'w') as file:
|
||||
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
|
||||
if i == 50:
|
||||
file.write('bingo')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_dir('bingo')
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = (
|
||||
'[Found 1 matches for "bingo" in ./]\n'
|
||||
'./a50.txt (Line 6): bingo\n'
|
||||
'[End of matches for "bingo" in ./]\n'
|
||||
)
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_file(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_file('Line 5', str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
expected = f'[Found 1 matches for "Line 5" in {temp_file_path}]\n'
|
||||
expected += 'Line 5: Line 5\n'
|
||||
expected += f'[End of matches for "Line 5" in {temp_file_path}]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_file_not_exist_term(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
search_file('Line 6', str(temp_file_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[No matches found for "Line 6" in {temp_file_path}]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_search_file_not_exist_file():
|
||||
_capture_file_operation_error(
|
||||
lambda: search_file('Line 6', '/unexist/path/a.txt'),
|
||||
'ERROR: File /unexist/path/a.txt not found.',
|
||||
)
|
||||
|
||||
|
||||
def test_find_file(tmp_path):
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
find_file('a.txt', str(tmp_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[Found 1 matches for "a.txt" in {tmp_path}]\n'
|
||||
expected += f'{tmp_path}/a.txt\n'
|
||||
expected += f'[End of matches for "a.txt" in {tmp_path}]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_find_file_cwd(tmp_path, monkeypatch):
|
||||
monkeypatch.chdir(tmp_path)
|
||||
temp_file_path = tmp_path / 'a.txt'
|
||||
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
|
||||
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
find_file('a.txt')
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
|
||||
def test_find_file_not_exist_file():
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
find_file('nonexist.txt')
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = '[No matches found for "nonexist.txt" in ./]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_find_file_not_exist_file_specific_path(tmp_path):
|
||||
with io.StringIO() as buf:
|
||||
with contextlib.redirect_stdout(buf):
|
||||
find_file('nonexist.txt', str(tmp_path))
|
||||
result = buf.getvalue()
|
||||
assert result is not None
|
||||
|
||||
expected = f'[No matches found for "nonexist.txt" in {tmp_path}]\n'
|
||||
assert result.split('\n') == expected.split('\n')
|
||||
|
||||
|
||||
def test_parse_docx(tmp_path):
|
||||
# Create a DOCX file with some content
|
||||
test_docx_path = tmp_path / 'test.docx'
|
||||
doc = docx.Document()
|
||||
doc.add_paragraph('Hello, this is a test document.')
|
||||
doc.add_paragraph('This is the second paragraph.')
|
||||
doc.save(str(test_docx_path))
|
||||
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = io.StringIO()
|
||||
|
||||
# Call the parse_docx function
|
||||
parse_docx(str(test_docx_path))
|
||||
|
||||
# Capture the output
|
||||
output = sys.stdout.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
# Check if the output is correct
|
||||
expected_output = (
|
||||
f'[Reading DOCX file from {test_docx_path}]\n'
|
||||
'@@ Page 1 @@\nHello, this is a test document.\n\n'
|
||||
'@@ Page 2 @@\nThis is the second paragraph.\n\n\n'
|
||||
)
|
||||
assert output == expected_output, f'Expected output does not match. Got: {output}'
|
||||
|
||||
|
||||
def test_parse_latex(tmp_path):
|
||||
# Create a LaTeX file with some content
|
||||
test_latex_path = tmp_path / 'test.tex'
|
||||
with open(test_latex_path, 'w') as f:
|
||||
f.write(r"""
|
||||
\documentclass{article}
|
||||
\begin{document}
|
||||
Hello, this is a test LaTeX document.
|
||||
\end{document}
|
||||
""")
|
||||
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = io.StringIO()
|
||||
|
||||
# Call the parse_latex function
|
||||
parse_latex(str(test_latex_path))
|
||||
|
||||
# Capture the output
|
||||
output = sys.stdout.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
# Check if the output is correct
|
||||
expected_output = (
|
||||
f'[Reading LaTex file from {test_latex_path}]\n'
|
||||
'Hello, this is a test LaTeX document.\n'
|
||||
)
|
||||
assert output == expected_output, f'Expected output does not match. Got: {output}'
|
||||
|
||||
|
||||
def test_parse_pdf(tmp_path):
|
||||
# Create a PDF file with some content
|
||||
test_pdf_path = tmp_path / 'test.pdf'
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.pdfgen import canvas
|
||||
|
||||
c = canvas.Canvas(str(test_pdf_path), pagesize=letter)
|
||||
c.drawString(100, 750, 'Hello, this is a test PDF document.')
|
||||
c.save()
|
||||
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = io.StringIO()
|
||||
|
||||
# Call the parse_pdf function
|
||||
parse_pdf(str(test_pdf_path))
|
||||
|
||||
# Capture the output
|
||||
output = sys.stdout.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
# Check if the output is correct
|
||||
expected_output = (
|
||||
f'[Reading PDF file from {test_pdf_path}]\n'
|
||||
'@@ Page 1 @@\n'
|
||||
'Hello, this is a test PDF document.\n'
|
||||
)
|
||||
assert output == expected_output, f'Expected output does not match. Got: {output}'
|
||||
|
||||
|
||||
def test_parse_pptx(tmp_path):
|
||||
test_pptx_path = tmp_path / 'test.pptx'
|
||||
from pptx import Presentation
|
||||
|
||||
pres = Presentation()
|
||||
|
||||
slide1 = pres.slides.add_slide(pres.slide_layouts[0])
|
||||
title1 = slide1.shapes.title
|
||||
title1.text = 'Hello, this is the first test PPTX slide.'
|
||||
|
||||
slide2 = pres.slides.add_slide(pres.slide_layouts[0])
|
||||
title2 = slide2.shapes.title
|
||||
title2.text = 'Hello, this is the second test PPTX slide.'
|
||||
|
||||
pres.save(str(test_pptx_path))
|
||||
|
||||
old_stdout = sys.stdout
|
||||
sys.stdout = io.StringIO()
|
||||
|
||||
parse_pptx(str(test_pptx_path))
|
||||
|
||||
output = sys.stdout.getvalue()
|
||||
sys.stdout = old_stdout
|
||||
|
||||
expected_output = (
|
||||
f'[Reading PowerPoint file from {test_pptx_path}]\n'
|
||||
'@@ Slide 1 @@\n'
|
||||
'Hello, this is the first test PPTX slide.\n\n'
|
||||
'@@ Slide 2 @@\n'
|
||||
'Hello, this is the second test PPTX slide.\n\n'
|
||||
)
|
||||
assert output == expected_output, f'Expected output does not match. Got: {output}'
|
||||
Reference in New Issue
Block a user