Compare commits

...

1 Commits

Author SHA1 Message Date
openhands
3e9985f19a Fix issue #6083: Clean up unused agent skills after function calling migration 2025-01-06 16:55:56 +00:00
18 changed files with 4 additions and 1538 deletions

View File

@@ -14,7 +14,7 @@ OpenHands includes and adapts the following open source projects. We are gratefu
#### [Aider](https://github.com/paul-gauthier/aider)
- License: Apache License 2.0
- Description: AI pair programming tool. OpenHands has adapted and integrated its linter module for code-related tasks in [`agentskills utilities`](https://github.com/All-Hands-AI/OpenHands/tree/main/openhands/runtime/plugins/agent_skills/utils/aider)
- Description: AI pair programming tool. OpenHands has adapted and integrated its linter module for code-related tasks
#### [BrowserGym](https://github.com/ServiceNow/BrowserGym)
- License: Apache License 2.0

View File

@@ -37,7 +37,6 @@ from openhands.events.observation.observation import Observation
from openhands.events.serialization.event import truncate_content
from openhands.llm.llm import LLM
from openhands.runtime.plugins import (
AgentSkillsRequirement,
JupyterRequirement,
PluginRequirement,
)
@@ -66,10 +65,6 @@ class CodeActAgent(Agent):
"""
sandbox_plugins: list[PluginRequirement] = [
# NOTE: AgentSkillsRequirement need to go before JupyterRequirement, since
# AgentSkillsRequirement provides a lot of Python functions,
# and it needs to be initialized before Jupyter for Jupyter to use those functions.
AgentSkillsRequirement(),
JupyterRequirement(),
]

View File

@@ -501,7 +501,7 @@ def response_to_actions(response: ModelResponse) -> list[Action]:
elif tool_call.function.name == 'edit_file':
action = FileEditAction(**arguments)
elif tool_call.function.name == 'str_replace_editor':
# We implement this in agent_skills, which can be used via Jupyter
# This is now implemented via function calling
# convert tool_call.function.arguments to kwargs that can be passed to file_editor
code = f'print(file_editor(**{arguments}))'
logger.debug(

View File

@@ -128,16 +128,7 @@ class ActionExecutor:
timeout=30,
)
# This is a temporary workaround
# TODO: refactor AgentSkills to be part of JupyterPlugin
# AFTER ServerRuntime is deprecated
if 'agent_skills' in self.plugins and 'jupyter' in self.plugins:
obs = await self.run_ipython(
IPythonRunCellAction(
code='from openhands.runtime.plugins.agent_skills.agentskills import *\n'
)
)
logger.debug(f'AgentSkills initialized: {obs}')
await self._init_bash_commands()
logger.debug('Runtime client initialized.')

View File

@@ -1,8 +1,4 @@
# Requirements
from openhands.runtime.plugins.agent_skills import (
AgentSkillsPlugin,
AgentSkillsRequirement,
)
from openhands.runtime.plugins.jupyter import JupyterPlugin, JupyterRequirement
from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
from openhands.runtime.plugins.vscode import VSCodePlugin, VSCodeRequirement
@@ -10,8 +6,6 @@ from openhands.runtime.plugins.vscode import VSCodePlugin, VSCodeRequirement
__all__ = [
'Plugin',
'PluginRequirement',
'AgentSkillsRequirement',
'AgentSkillsPlugin',
'JupyterRequirement',
'JupyterPlugin',
'VSCodeRequirement',
@@ -20,6 +14,5 @@ __all__ = [
ALL_PLUGINS = {
'jupyter': JupyterPlugin,
'agent_skills': AgentSkillsPlugin,
'vscode': VSCodePlugin,
}

View File

@@ -1,57 +0,0 @@
# OpenHands Skill Sets
This folder implements a skill/tool set `agentskills` for OpenHands.
It is intended to be used by the agent **inside sandbox**.
The skill set will be exposed as a `pip` package that can be installed as a plugin inside the sandbox.
The skill set can contain a bunch of wrapped tools for agent ([many examples here](https://github.com/All-Hands-AI/OpenHands/pull/1914)), for example:
- Audio/Video to text (these are a temporary solution, and we should switch to multimodal models when they are sufficiently cheap
- PDF to text
- etc.
# Inclusion Criteria
We are walking a fine line here.
We DON't want to *wrap* every possible python packages and re-teach agent their usage (e.g., LLM already knows `pandas` pretty well, so we don't really need create a skill that reads `csv` - it can just use `pandas`).
We ONLY want to add a new skill, when:
- Such skill is not easily achievable for LLM to write code directly (e.g., edit code and replace certain line)
- It involves calling an external model (e.g., you need to call a speech to text model, editor model for speculative editing)
# Intended functionality
- Tool/skill usage (through `IPythonRunAction`)
```python
# In[1]
from agentskills import open_file, edit_file
open_file("/workspace/a.txt")
# Out[1]
[SWE-agent open output]
# In[2]
edit_file(
"/workspace/a.txt",
start=1, end=3,
content=(
("REPLACE TEXT")
))
# Out[1]
[SWE-agent edit output]
```
- Tool/skill retrieval (through `IPythonRunAction`)
```python
# In[1]
from agentskills import help_me
help_me("I want to solve a task that involves reading a bunch of PDFs and reason about them")
# Out[1]
"Here are the top skills that may be helpful to you:
- `pdf_to_text`: [documentation about the tools]
...
"
```

View File

@@ -1,14 +0,0 @@
from dataclasses import dataclass
from openhands.runtime.plugins.agent_skills import agentskills
from openhands.runtime.plugins.requirement import Plugin, PluginRequirement
@dataclass
class AgentSkillsRequirement(PluginRequirement):
name: str = 'agent_skills'
documentation: str = agentskills.DOCUMENTATION
class AgentSkillsPlugin(Plugin):
name: str = 'agent_skills'

View File

@@ -1,31 +0,0 @@
from inspect import signature
from openhands.runtime.plugins.agent_skills import file_ops, file_reader
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
import_functions(
module=file_ops, function_names=file_ops.__all__, target_globals=globals()
)
import_functions(
module=file_reader, function_names=file_reader.__all__, target_globals=globals()
)
__all__ = file_ops.__all__ + file_reader.__all__
DOCUMENTATION = ''
for func_name in __all__:
func = globals()[func_name]
cur_doc = func.__doc__
# remove indentation from docstring and extra empty lines
cur_doc = '\n'.join(filter(None, map(lambda x: x.strip(), cur_doc.split('\n'))))
# now add a consistent 4 indentation
cur_doc = '\n'.join(map(lambda x: ' ' * 4 + x, cur_doc.split('\n')))
fn_signature = f'{func.__name__}' + str(signature(func))
DOCUMENTATION += f'{fn_signature}:\n{cur_doc}\n\n'
# Add file_editor (a function)
from openhands.runtime.plugins.agent_skills.file_editor import file_editor # noqa: E402
__all__ += ['file_editor']

View File

@@ -1,3 +0,0 @@
# File Editor
This file editor is largely based on Anthorpic released [`str_replace_editor`](https://github.com/anthropics/anthropic-quickstarts/tree/main/computer-use-demo/computer_use_demo/tools/edit.py). The original code was released under [MIT license](https://github.com/anthropics/anthropic-quickstarts/blob/e373524f07594d48c3f9563248ea282a4c306c0c/LICENSE).

View File

@@ -1,8 +0,0 @@
"""This file imports a global singleton of the `EditTool` class as well as raw functions that expose
its __call__.
The implementation of the `EditTool` class can be found at: https://github.com/All-Hands-AI/openhands-aci/.
"""
from openhands_aci.editor import file_editor
__all__ = ['file_editor']

View File

@@ -1,7 +0,0 @@
from openhands.runtime.plugins.agent_skills.file_ops import file_ops
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
import_functions(
module=file_ops, function_names=file_ops.__all__, target_globals=globals()
)
__all__ = file_ops.__all__

View File

@@ -1,381 +0,0 @@
"""file_ops.py
This module provides various file manipulation skills for the OpenHands agent.
Functions:
- open_file(path: str, line_number: int | None = 1, context_lines: int = 100): Opens a file and optionally moves to a specific line.
- goto_line(line_number: int): Moves the window to show the specified line number.
- scroll_down(): Moves the window down by the number of lines specified in WINDOW.
- scroll_up(): Moves the window up by the number of lines specified in WINDOW.
- search_dir(search_term: str, dir_path: str = './'): Searches for a term in all files in the specified directory.
- search_file(search_term: str, file_path: str | None = None): Searches for a term in the specified file or the currently open file.
- find_file(file_name: str, dir_path: str = './'): Finds all files with the given name in the specified directory.
"""
import os
from openhands.linter import DefaultLinter, LintResult
CURRENT_FILE: str | None = None
CURRENT_LINE = 1
WINDOW = 100
# This is also used in unit tests!
MSG_FILE_UPDATED = '[File updated (edited at line {line_number}). Please review the changes and make sure they are correct (correct indentation, no duplicate lines, etc). Edit the file again if necessary.]'
LINTER_ERROR_MSG = '[Your proposed edit has introduced new syntax error(s). Please understand the errors and retry your edit command.]\n'
# ==================================================================================================
def _output_error(error_msg: str) -> bool:
print(f'ERROR: {error_msg}')
return False
def _is_valid_filename(file_name) -> bool:
if not file_name or not isinstance(file_name, str) or not file_name.strip():
return False
invalid_chars = '<>:"/\\|?*'
if os.name == 'nt': # Windows
invalid_chars = '<>:"/\\|?*'
elif os.name == 'posix': # Unix-like systems
invalid_chars = '\0'
for char in invalid_chars:
if char in file_name:
return False
return True
def _is_valid_path(path) -> bool:
if not path or not isinstance(path, str):
return False
try:
return os.path.exists(os.path.normpath(path))
except PermissionError:
return False
def _create_paths(file_name) -> bool:
try:
dirname = os.path.dirname(file_name)
if dirname:
os.makedirs(dirname, exist_ok=True)
return True
except PermissionError:
return False
def _check_current_file(file_path: str | None = None) -> bool:
global CURRENT_FILE
if not file_path:
file_path = CURRENT_FILE
if not file_path or not os.path.isfile(file_path):
return _output_error('No file open. Use the open_file function first.')
return True
def _clamp(value, min_value, max_value):
return max(min_value, min(value, max_value))
def _lint_file(file_path: str) -> tuple[str | None, int | None]:
"""Lint the file at the given path and return a tuple with a boolean indicating if there are errors,
and the line number of the first error, if any.
Returns:
tuple[str | None, int | None]: (lint_error, first_error_line_number)
"""
linter = DefaultLinter()
lint_error: list[LintResult] = linter.lint(file_path)
if not lint_error:
# Linting successful. No issues found.
return None, None
first_error_line = lint_error[0].line if len(lint_error) > 0 else None
error_text = 'ERRORS:\n' + '\n'.join(
[f'{file_path}:{err.line}:{err.column}: {err.message}' for err in lint_error]
)
return error_text, first_error_line
def _print_window(
file_path, targeted_line, window, return_str=False, ignore_window=False
):
global CURRENT_LINE
_check_current_file(file_path)
with open(file_path) as file:
content = file.read()
# Ensure the content ends with a newline character
if not content.endswith('\n'):
content += '\n'
lines = content.splitlines(True) # Keep all line ending characters
total_lines = len(lines)
# cover edge cases
CURRENT_LINE = _clamp(targeted_line, 1, total_lines)
half_window = max(1, window // 2)
if ignore_window:
# Use CURRENT_LINE as starting line (for e.g. scroll_down)
start = max(1, CURRENT_LINE)
end = min(total_lines, CURRENT_LINE + window)
else:
# Ensure at least one line above and below the targeted line
start = max(1, CURRENT_LINE - half_window)
end = min(total_lines, CURRENT_LINE + half_window)
# Adjust start and end to ensure at least one line above and below
if start == 1:
end = min(total_lines, start + window - 1)
if end == total_lines:
start = max(1, end - window + 1)
output = ''
# only display this when there's at least one line above
if start > 1:
output += f'({start - 1} more lines above)\n'
else:
output += '(this is the beginning of the file)\n'
for i in range(start, end + 1):
_new_line = f'{i}|{lines[i-1]}'
if not _new_line.endswith('\n'):
_new_line += '\n'
output += _new_line
if end < total_lines:
output += f'({total_lines - end} more lines below)\n'
else:
output += '(this is the end of the file)\n'
output = output.rstrip()
if return_str:
return output
else:
print(output)
def _cur_file_header(current_file, total_lines) -> str:
if not current_file:
return ''
return f'[File: {os.path.abspath(current_file)} ({total_lines} lines total)]\n'
def open_file(
path: str, line_number: int | None = 1, context_lines: int | None = WINDOW
) -> None:
"""Opens the file at the given path in the editor. IF the file is to be edited, first use `scroll_down` repeatedly to read the full file!
If line_number is provided, the window will be moved to include that line.
It only shows the first 100 lines by default! `context_lines` is the max number of lines to be displayed, up to 100. Use `scroll_up` and `scroll_down` to view more content up or down.
Args:
path: str: The path to the file to open, preferred absolute path.
line_number: int | None = 1: The line number to move to. Defaults to 1.
context_lines: int | None = 100: Only shows this number of lines in the context window (usually from line 1), with line_number as the center (if possible). Defaults to 100.
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
if not os.path.isfile(path):
_output_error(f'File {path} not found.')
return
CURRENT_FILE = os.path.abspath(path)
with open(CURRENT_FILE) as file:
total_lines = max(1, sum(1 for _ in file))
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
_output_error(f'Line number must be between 1 and {total_lines}')
return
CURRENT_LINE = line_number
# Override WINDOW with context_lines
if context_lines is None or context_lines < 1:
context_lines = WINDOW
output = _cur_file_header(CURRENT_FILE, total_lines)
output += _print_window(
CURRENT_FILE,
CURRENT_LINE,
_clamp(context_lines, 1, 100),
return_str=True,
ignore_window=False,
)
if output.strip().endswith('more lines below)'):
output += '\n[Use `scroll_down` to view the next 100 lines of the file!]'
print(output)
def goto_line(line_number: int) -> None:
"""Moves the window to show the specified line number.
Args:
line_number: int: The line number to move to.
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
if not isinstance(line_number, int) or line_number < 1 or line_number > total_lines:
_output_error(f'Line number must be between 1 and {total_lines}.')
return
CURRENT_LINE = _clamp(line_number, 1, total_lines)
output = _cur_file_header(CURRENT_FILE, total_lines)
output += _print_window(
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=False
)
print(output)
def scroll_down() -> None:
"""Moves the window down by 100 lines.
Args:
None
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
CURRENT_LINE = _clamp(CURRENT_LINE + WINDOW, 1, total_lines)
output = _cur_file_header(CURRENT_FILE, total_lines)
output += _print_window(
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=True
)
print(output)
def scroll_up() -> None:
"""Moves the window up by 100 lines.
Args:
None
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
CURRENT_LINE = _clamp(CURRENT_LINE - WINDOW, 1, total_lines)
output = _cur_file_header(CURRENT_FILE, total_lines)
output += _print_window(
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=True
)
print(output)
class LineNumberError(Exception):
pass
def search_dir(search_term: str, dir_path: str = './') -> None:
"""Searches for search_term in all files in dir. If dir is not provided, searches in the current directory.
Args:
search_term: str: The term to search for.
dir_path: str: The path to the directory to search.
"""
if not os.path.isdir(dir_path):
_output_error(f'Directory {dir_path} not found')
return
matches = []
for root, _, files in os.walk(dir_path):
for file in files:
if file.startswith('.'):
continue
file_path = os.path.join(root, file)
with open(file_path, 'r', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
if search_term in line:
matches.append((file_path, line_num, line.strip()))
if not matches:
print(f'No matches found for "{search_term}" in {dir_path}')
return
num_matches = len(matches)
num_files = len(set(match[0] for match in matches))
if num_files > 100:
print(
f'More than {num_files} files matched for "{search_term}" in {dir_path}. Please narrow your search.'
)
return
print(f'[Found {num_matches} matches for "{search_term}" in {dir_path}]')
for file_path, line_num, line in matches:
print(f'{file_path} (Line {line_num}): {line}')
print(f'[End of matches for "{search_term}" in {dir_path}]')
def search_file(search_term: str, file_path: str | None = None) -> None:
"""Searches for search_term in file. If file is not provided, searches in the current open file.
Args:
search_term: str: The term to search for.
file_path: str | None: The path to the file to search.
"""
global CURRENT_FILE
if file_path is None:
file_path = CURRENT_FILE
if file_path is None:
_output_error('No file specified or open. Use the open_file function first.')
return
if not os.path.isfile(file_path):
_output_error(f'File {file_path} not found.')
return
matches = []
with open(file_path) as file:
for i, line in enumerate(file, 1):
if search_term in line:
matches.append((i, line.strip()))
if matches:
print(f'[Found {len(matches)} matches for "{search_term}" in {file_path}]')
for match in matches:
print(f'Line {match[0]}: {match[1]}')
print(f'[End of matches for "{search_term}" in {file_path}]')
else:
print(f'[No matches found for "{search_term}" in {file_path}]')
def find_file(file_name: str, dir_path: str = './') -> None:
"""Finds all files with the given name in the specified directory.
Args:
file_name: str: The name of the file to find.
dir_path: str: The path to the directory to search.
"""
if not os.path.isdir(dir_path):
_output_error(f'Directory {dir_path} not found')
return
matches = []
for root, _, files in os.walk(dir_path):
for file in files:
if file_name in file:
matches.append(os.path.join(root, file))
if matches:
print(f'[Found {len(matches)} matches for "{file_name}" in {dir_path}]')
for match in matches:
print(f'{match}')
print(f'[End of matches for "{file_name}" in {dir_path}]')
else:
print(f'[No matches found for "{file_name}" in {dir_path}]')
__all__ = [
'open_file',
'goto_line',
'scroll_down',
'scroll_up',
'search_dir',
'search_file',
'find_file',
]

View File

@@ -1,7 +0,0 @@
from openhands.runtime.plugins.agent_skills.file_reader import file_readers
from openhands.runtime.plugins.agent_skills.utils.dependency import import_functions
import_functions(
module=file_readers, function_names=file_readers.__all__, target_globals=globals()
)
__all__ = file_readers.__all__

View File

@@ -1,244 +0,0 @@
"""File reader skills for the OpenHands agent.
This module provides various functions to parse and extract content from different file types,
including PDF, DOCX, LaTeX, audio, image, video, and PowerPoint files. It utilizes different
libraries and APIs to process these files and output their content or descriptions.
Functions:
parse_pdf(file_path: str) -> None: Parse and print content of a PDF file.
parse_docx(file_path: str) -> None: Parse and print content of a DOCX file.
parse_latex(file_path: str) -> None: Parse and print content of a LaTeX file.
parse_audio(file_path: str, model: str = 'whisper-1') -> None: Transcribe and print content of an audio file.
parse_image(file_path: str, task: str = 'Describe this image as detail as possible.') -> None: Analyze and print description of an image file.
parse_video(file_path: str, task: str = 'Describe this image as detail as possible.', frame_interval: int = 30) -> None: Analyze and print description of video frames.
parse_pptx(file_path: str) -> None: Parse and print content of a PowerPoint file.
Note:
Some functions (parse_audio, parse_video, parse_image) require OpenAI API credentials
and are only available if the necessary environment variables are set.
"""
import base64
import docx
import PyPDF2
from pptx import Presentation
from pylatexenc.latex2text import LatexNodes2Text
from openhands.runtime.plugins.agent_skills.utils.config import (
_get_max_token,
_get_openai_api_key,
_get_openai_base_url,
_get_openai_client,
_get_openai_model,
)
def parse_pdf(file_path: str) -> None:
"""Parses the content of a PDF file and prints it.
Args:
file_path: str: The path to the file to open.
"""
print(f'[Reading PDF file from {file_path}]')
content = PyPDF2.PdfReader(file_path)
text = ''
for page_idx in range(len(content.pages)):
text += (
f'@@ Page {page_idx + 1} @@\n'
+ content.pages[page_idx].extract_text()
+ '\n\n'
)
print(text.strip())
def parse_docx(file_path: str) -> None:
"""Parses the content of a DOCX file and prints it.
Args:
file_path: str: The path to the file to open.
"""
print(f'[Reading DOCX file from {file_path}]')
content = docx.Document(file_path)
text = ''
for i, para in enumerate(content.paragraphs):
text += f'@@ Page {i + 1} @@\n' + para.text + '\n\n'
print(text)
def parse_latex(file_path: str) -> None:
"""Parses the content of a LaTex file and prints it.
Args:
file_path: str: The path to the file to open.
"""
print(f'[Reading LaTex file from {file_path}]')
with open(file_path) as f:
data = f.read()
text = LatexNodes2Text().latex_to_text(data)
print(text.strip())
def _base64_img(file_path: str) -> str:
with open(file_path, 'rb') as image_file:
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
return encoded_image
def _base64_video(file_path: str, frame_interval: int = 10) -> list[str]:
import cv2
video = cv2.VideoCapture(file_path)
base64_frames = []
frame_count = 0
while video.isOpened():
success, frame = video.read()
if not success:
break
if frame_count % frame_interval == 0:
_, buffer = cv2.imencode('.jpg', frame)
base64_frames.append(base64.b64encode(buffer).decode('utf-8'))
frame_count += 1
video.release()
return base64_frames
def _prepare_image_messages(task: str, base64_image: str):
return [
{
'role': 'user',
'content': [
{'type': 'text', 'text': task},
{
'type': 'image_url',
'image_url': {'url': f'data:image/jpeg;base64,{base64_image}'},
},
],
}
]
def parse_audio(file_path: str, model: str = 'whisper-1') -> None:
"""Parses the content of an audio file and prints it.
Args:
file_path: str: The path to the audio file to transcribe.
model: str: The audio model to use for transcription. Defaults to 'whisper-1'.
"""
print(f'[Transcribing audio file from {file_path}]')
try:
# TODO: record the COST of the API call
with open(file_path, 'rb') as audio_file:
transcript = _get_openai_client().audio.translations.create(
model=model, file=audio_file
)
print(transcript.text)
except Exception as e:
print(f'Error transcribing audio file: {e}')
def parse_image(
file_path: str, task: str = 'Describe this image as detail as possible.'
) -> None:
"""Parses the content of an image file and prints the description.
Args:
file_path: str: The path to the file to open.
task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'.
"""
print(f'[Reading image file from {file_path}]')
# TODO: record the COST of the API call
try:
base64_image = _base64_img(file_path)
response = _get_openai_client().chat.completions.create(
model=_get_openai_model(),
messages=_prepare_image_messages(task, base64_image),
max_tokens=_get_max_token(),
)
content = response.choices[0].message.content
print(content)
except Exception as error:
print(f'Error with the request: {error}')
def parse_video(
file_path: str,
task: str = 'Describe this image as detail as possible.',
frame_interval: int = 30,
) -> None:
"""Parses the content of an image file and prints the description.
Args:
file_path: str: The path to the video file to open.
task: str: The task description for the API call. Defaults to 'Describe this image as detail as possible.'.
frame_interval: int: The interval between frames to analyze. Defaults to 30.
"""
print(
f'[Processing video file from {file_path} with frame interval {frame_interval}]'
)
task = task or 'This is one frame from a video, please summarize this frame.'
base64_frames = _base64_video(file_path)
selected_frames = base64_frames[::frame_interval]
if len(selected_frames) > 30:
new_interval = len(base64_frames) // 30
selected_frames = base64_frames[::new_interval]
print(f'Totally {len(selected_frames)} would be analyze...\n')
idx = 0
for base64_frame in selected_frames:
idx += 1
print(f'Process the {file_path}, current No. {idx * frame_interval} frame...')
# TODO: record the COST of the API call
try:
response = _get_openai_client().chat.completions.create(
model=_get_openai_model(),
messages=_prepare_image_messages(task, base64_frame),
max_tokens=_get_max_token(),
)
content = response.choices[0].message.content
current_frame_content = f"Frame {idx}'s content: {content}\n"
print(current_frame_content)
except Exception as error:
print(f'Error with the request: {error}')
def parse_pptx(file_path: str) -> None:
"""Parses the content of a pptx file and prints it.
Args:
file_path: str: The path to the file to open.
"""
print(f'[Reading PowerPoint file from {file_path}]')
try:
pres = Presentation(str(file_path))
text = []
for slide_idx, slide in enumerate(pres.slides):
text.append(f'@@ Slide {slide_idx + 1} @@')
for shape in slide.shapes:
if hasattr(shape, 'text'):
text.append(shape.text)
print('\n'.join(text))
except Exception as e:
print(f'Error reading PowerPoint file: {e}')
__all__ = [
'parse_pdf',
'parse_docx',
'parse_latex',
'parse_pptx',
]
# This is called from OpenHands's side
# If SANDBOX_ENV_OPENAI_API_KEY is set, we will be able to use these tools in the sandbox environment
if _get_openai_api_key() and _get_openai_base_url():
__all__ += ['parse_audio', 'parse_video', 'parse_image']

View File

@@ -1,30 +0,0 @@
import os
from openai import OpenAI
# ==================================================================================================
# OPENAI
# TODO: Move this to EventStream Actions when DockerRuntime is fully implemented
# NOTE: we need to get env vars inside functions because they will be set in IPython
# AFTER the agentskills is imported (the case for DockerRuntime)
# ==================================================================================================
def _get_openai_api_key():
return os.getenv('OPENAI_API_KEY', os.getenv('SANDBOX_ENV_OPENAI_API_KEY', ''))
def _get_openai_base_url():
return os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1')
def _get_openai_model():
return os.getenv('OPENAI_MODEL', 'gpt-4o')
def _get_max_token():
return os.getenv('MAX_TOKEN', 500)
def _get_openai_client():
client = OpenAI(api_key=_get_openai_api_key(), base_url=_get_openai_base_url())
return client

View File

@@ -1,11 +0,0 @@
from types import ModuleType
def import_functions(
module: ModuleType, function_names: list[str], target_globals: dict
) -> None:
for name in function_names:
if hasattr(module, name):
target_globals[name] = getattr(module, name)
else:
raise ValueError(f'Function {name} not found in {module.__name__}')

View File

@@ -49,10 +49,7 @@ bashlex = "^0.18"
pyjwt = "^2.9.0"
dirhash = "*"
python-frontmatter = "^1.1.0"
python-docx = "*"
PyPDF2 = "*"
python-pptx = "*"
pylatexenc = "*"
tornado = "*"
python-dotenv = "*"
pylcs = "^0.1.1"

View File

@@ -1,717 +0,0 @@
import contextlib
import io
import sys
import docx
import pytest
from openhands.runtime.plugins.agent_skills.file_ops.file_ops import (
WINDOW,
_print_window,
find_file,
goto_line,
open_file,
scroll_down,
scroll_up,
search_dir,
search_file,
)
from openhands.runtime.plugins.agent_skills.file_reader.file_readers import (
parse_docx,
parse_latex,
parse_pdf,
parse_pptx,
)
# CURRENT_FILE must be reset for each test
@pytest.fixture(autouse=True)
def reset_current_file():
from openhands.runtime.plugins.agent_skills import agentskills
agentskills.CURRENT_FILE = None
def _numbered_test_lines(start, end) -> str:
return ('\n'.join(f'{i}|' for i in range(start, end + 1))) + '\n'
def _generate_test_file_with_lines(temp_path, num_lines) -> str:
file_path = temp_path / 'test_file.py'
file_path.write_text('\n' * num_lines)
return file_path
def _generate_ruby_test_file_with_lines(temp_path, num_lines) -> str:
file_path = temp_path / 'test_file.rb'
file_path.write_text('\n' * num_lines)
return file_path
def _calculate_window_bounds(current_line, total_lines, window_size):
"""Calculate the bounds of the window around the current line."""
half_window = window_size // 2
if current_line - half_window < 0:
start = 1
end = window_size
else:
start = current_line - half_window
end = current_line + half_window
return start, end
def _capture_file_operation_error(operation, expected_error_msg):
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
operation()
result = buf.getvalue().strip()
assert result == expected_error_msg
SEP = '-' * 49 + '\n'
# =============================================================================
def test_open_file_unexist_path():
_capture_file_operation_error(
lambda: open_file('/unexist/path/a.txt'),
'ERROR: File /unexist/path/a.txt not found.',
)
def test_open_file(tmp_path):
assert tmp_path is not None
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = (
f'[File: {temp_file_path} (5 lines total)]\n'
'(this is the beginning of the file)\n'
'1|Line 1\n'
'2|Line 2\n'
'3|Line 3\n'
'4|Line 4\n'
'5|Line 5\n'
'(this is the end of the file)\n'
)
assert result.split('\n') == expected.split('\n')
def test_open_file_with_indentation(tmp_path):
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\n Line 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = (
f'[File: {temp_file_path} (5 lines total)]\n'
'(this is the beginning of the file)\n'
'1|Line 1\n'
'2| Line 2\n'
'3|Line 3\n'
'4|Line 4\n'
'5|Line 5\n'
'(this is the end of the file)\n'
)
assert result.split('\n') == expected.split('\n')
def test_open_file_long(tmp_path):
temp_file_path = tmp_path / 'a.txt'
content = '\n'.join([f'Line {i}' for i in range(1, 1001)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path), 1, 50)
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} (1000 lines total)]\n'
expected += '(this is the beginning of the file)\n'
for i in range(1, 51):
expected += f'{i}|Line {i}\n'
expected += '(950 more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result.split('\n') == expected.split('\n')
def test_open_file_long_with_lineno(tmp_path):
temp_file_path = tmp_path / 'a.txt'
content = '\n'.join([f'Line {i}' for i in range(1, 1001)])
temp_file_path.write_text(content)
cur_line = 100
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path), cur_line)
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} (1000 lines total)]\n'
# since 100 is < WINDOW and 100 - WINDOW//2 < 0, so it should show all lines from 1 to WINDOW
start, end = _calculate_window_bounds(cur_line, 1000, WINDOW)
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == 1000:
expected += '(this is the end of the file)\n'
else:
expected += f'({1000 - end} more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result.split('\n') == expected.split('\n')
def test_goto_line(tmp_path):
temp_file_path = tmp_path / 'a.txt'
total_lines = 1000
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
expected += '(this is the beginning of the file)\n'
for i in range(1, WINDOW + 1):
expected += f'{i}|Line {i}\n'
expected += f'({total_lines - WINDOW} more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result.split('\n') == expected.split('\n')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
goto_line(500)
result = buf.getvalue()
assert result is not None
cur_line = 500
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == total_lines:
expected += '(this is the end of the file)\n'
else:
expected += f'({total_lines - end} more lines below)\n'
assert result.split('\n') == expected.split('\n')
def test_goto_line_negative(tmp_path):
temp_file_path = tmp_path / 'a.txt'
content = '\n'.join([f'Line {i}' for i in range(1, 5)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
_capture_file_operation_error(
lambda: goto_line(-1), 'ERROR: Line number must be between 1 and 4.'
)
def test_goto_line_out_of_bound(tmp_path):
temp_file_path = tmp_path / 'a.txt'
content = '\n'.join([f'Line {i}' for i in range(1, 10)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
_capture_file_operation_error(
lambda: goto_line(100), 'ERROR: Line number must be between 1 and 9.'
)
def test_scroll_down(tmp_path):
temp_file_path = tmp_path / 'a.txt'
total_lines = 1000
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
start, end = _calculate_window_bounds(1, total_lines, WINDOW)
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == total_lines:
expected += '(this is the end of the file)\n'
else:
expected += f'({total_lines - end} more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result.split('\n') == expected.split('\n')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
scroll_down()
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
start = WINDOW + 1
end = 2 * WINDOW + 1
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == total_lines:
expected += '(this is the end of the file)\n'
else:
expected += f'({total_lines - end} more lines below)\n'
assert result.split('\n') == expected.split('\n')
def test_scroll_up(tmp_path):
temp_file_path = tmp_path / 'a.txt'
total_lines = 1000
content = '\n'.join([f'Line {i}' for i in range(1, total_lines + 1)])
temp_file_path.write_text(content)
cur_line = 300
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path), cur_line)
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
start, end = _calculate_window_bounds(cur_line, total_lines, WINDOW)
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == total_lines:
expected += '(this is the end of the file)\n'
else:
expected += f'({total_lines - end} more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result.split('\n') == expected.split('\n')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
scroll_up()
result = buf.getvalue()
assert result is not None
cur_line = cur_line - WINDOW
expected = f'[File: {temp_file_path} ({total_lines} lines total)]\n'
start = cur_line
end = cur_line + WINDOW
if start == 1:
expected += '(this is the beginning of the file)\n'
else:
expected += f'({start - 1} more lines above)\n'
for i in range(start, end + 1):
expected += f'{i}|Line {i}\n'
if end == total_lines:
expected += '(this is the end of the file)\n'
else:
expected += f'({total_lines - end} more lines below)\n'
assert result.split('\n') == expected.split('\n')
def test_scroll_down_edge(tmp_path):
temp_file_path = tmp_path / 'a.txt'
content = '\n'.join([f'Line {i}' for i in range(1, 10)])
temp_file_path.write_text(content)
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
open_file(str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = f'[File: {temp_file_path} (9 lines total)]\n'
expected += '(this is the beginning of the file)\n'
for i in range(1, 10):
expected += f'{i}|Line {i}\n'
expected += '(this is the end of the file)\n'
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
scroll_down()
result = buf.getvalue()
assert result is not None
# expected should be unchanged
assert result.split('\n') == expected.split('\n')
def test_print_window_internal(tmp_path):
test_file_path = tmp_path / 'a.txt'
test_file_path.write_text('')
open_file(str(test_file_path))
with open(test_file_path, 'w') as file:
for i in range(1, 101):
file.write(f'Line `{i}`\n')
# Define the parameters for the test
current_line = 50
window = 2
# Test _print_window especially with backticks
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
_print_window(str(test_file_path), current_line, window, return_str=False)
result = buf.getvalue()
expected = (
'(48 more lines above)\n'
'49|Line `49`\n'
'50|Line `50`\n'
'51|Line `51`\n'
'(49 more lines below)\n'
)
assert result == expected
def test_open_file_large_line_number(tmp_path):
test_file_path = tmp_path / 'a.txt'
test_file_path.write_text('')
open_file(str(test_file_path))
with open(test_file_path, 'w') as file:
for i in range(1, 1000):
file.write(f'Line `{i}`\n')
# Define the parameters for the test
current_line = 800
window = 100
# Test _print_window especially with backticks
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
# _print_window(str(test_file_path), current_line, window, return_str=False)
open_file(str(test_file_path), current_line, window)
result = buf.getvalue()
expected = f'[File: {test_file_path} (999 lines total)]\n'
expected += '(749 more lines above)\n'
for i in range(750, 850 + 1):
expected += f'{i}|Line `{i}`\n'
expected += '(149 more lines below)\n'
expected += '[Use `scroll_down` to view the next 100 lines of the file!]\n'
assert result == expected
def test_search_dir(tmp_path):
# create files with the search term "bingo"
for i in range(1, 101):
temp_file_path = tmp_path / f'a{i}.txt'
with open(temp_file_path, 'w') as file:
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
if i == 50:
file.write('bingo')
# test
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_dir('bingo', str(tmp_path))
result = buf.getvalue()
assert result is not None
expected = (
f'[Found 1 matches for "bingo" in {tmp_path}]\n'
f'{tmp_path}/a50.txt (Line 6): bingo\n'
f'[End of matches for "bingo" in {tmp_path}]\n'
)
assert result.split('\n') == expected.split('\n')
def test_search_dir_not_exist_term(tmp_path):
# create files with the search term "bingo"
for i in range(1, 101):
temp_file_path = tmp_path / f'a{i}.txt'
with open(temp_file_path, 'w') as file:
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
# test
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_dir('non-exist', str(tmp_path))
result = buf.getvalue()
assert result is not None
expected = f'No matches found for "non-exist" in {tmp_path}\n'
assert result.split('\n') == expected.split('\n')
def test_search_dir_too_much_match(tmp_path):
# create files with the search term "Line 5"
for i in range(1, 1000):
temp_file_path = tmp_path / f'a{i}.txt'
with open(temp_file_path, 'w') as file:
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_dir('Line 5', str(tmp_path))
result = buf.getvalue()
assert result is not None
expected = f'More than 999 files matched for "Line 5" in {tmp_path}. Please narrow your search.\n'
assert result.split('\n') == expected.split('\n')
def test_search_dir_cwd(tmp_path, monkeypatch):
# Using pytest's monkeypatch to change directory without affecting other tests
monkeypatch.chdir(tmp_path)
# create files with the search term "bingo"
for i in range(1, 101):
temp_file_path = tmp_path / f'a{i}.txt'
with open(temp_file_path, 'w') as file:
file.write('Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n')
if i == 50:
file.write('bingo')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_dir('bingo')
result = buf.getvalue()
assert result is not None
expected = (
'[Found 1 matches for "bingo" in ./]\n'
'./a50.txt (Line 6): bingo\n'
'[End of matches for "bingo" in ./]\n'
)
assert result.split('\n') == expected.split('\n')
def test_search_file(tmp_path):
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_file('Line 5', str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = f'[Found 1 matches for "Line 5" in {temp_file_path}]\n'
expected += 'Line 5: Line 5\n'
expected += f'[End of matches for "Line 5" in {temp_file_path}]\n'
assert result.split('\n') == expected.split('\n')
def test_search_file_not_exist_term(tmp_path):
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
search_file('Line 6', str(temp_file_path))
result = buf.getvalue()
assert result is not None
expected = f'[No matches found for "Line 6" in {temp_file_path}]\n'
assert result.split('\n') == expected.split('\n')
def test_search_file_not_exist_file():
_capture_file_operation_error(
lambda: search_file('Line 6', '/unexist/path/a.txt'),
'ERROR: File /unexist/path/a.txt not found.',
)
def test_find_file(tmp_path):
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
find_file('a.txt', str(tmp_path))
result = buf.getvalue()
assert result is not None
expected = f'[Found 1 matches for "a.txt" in {tmp_path}]\n'
expected += f'{tmp_path}/a.txt\n'
expected += f'[End of matches for "a.txt" in {tmp_path}]\n'
assert result.split('\n') == expected.split('\n')
def test_find_file_cwd(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
temp_file_path = tmp_path / 'a.txt'
temp_file_path.write_text('Line 1\nLine 2\nLine 3\nLine 4\nLine 5')
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
find_file('a.txt')
result = buf.getvalue()
assert result is not None
def test_find_file_not_exist_file():
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
find_file('nonexist.txt')
result = buf.getvalue()
assert result is not None
expected = '[No matches found for "nonexist.txt" in ./]\n'
assert result.split('\n') == expected.split('\n')
def test_find_file_not_exist_file_specific_path(tmp_path):
with io.StringIO() as buf:
with contextlib.redirect_stdout(buf):
find_file('nonexist.txt', str(tmp_path))
result = buf.getvalue()
assert result is not None
expected = f'[No matches found for "nonexist.txt" in {tmp_path}]\n'
assert result.split('\n') == expected.split('\n')
def test_parse_docx(tmp_path):
# Create a DOCX file with some content
test_docx_path = tmp_path / 'test.docx'
doc = docx.Document()
doc.add_paragraph('Hello, this is a test document.')
doc.add_paragraph('This is the second paragraph.')
doc.save(str(test_docx_path))
old_stdout = sys.stdout
sys.stdout = io.StringIO()
# Call the parse_docx function
parse_docx(str(test_docx_path))
# Capture the output
output = sys.stdout.getvalue()
sys.stdout = old_stdout
# Check if the output is correct
expected_output = (
f'[Reading DOCX file from {test_docx_path}]\n'
'@@ Page 1 @@\nHello, this is a test document.\n\n'
'@@ Page 2 @@\nThis is the second paragraph.\n\n\n'
)
assert output == expected_output, f'Expected output does not match. Got: {output}'
def test_parse_latex(tmp_path):
# Create a LaTeX file with some content
test_latex_path = tmp_path / 'test.tex'
with open(test_latex_path, 'w') as f:
f.write(r"""
\documentclass{article}
\begin{document}
Hello, this is a test LaTeX document.
\end{document}
""")
old_stdout = sys.stdout
sys.stdout = io.StringIO()
# Call the parse_latex function
parse_latex(str(test_latex_path))
# Capture the output
output = sys.stdout.getvalue()
sys.stdout = old_stdout
# Check if the output is correct
expected_output = (
f'[Reading LaTex file from {test_latex_path}]\n'
'Hello, this is a test LaTeX document.\n'
)
assert output == expected_output, f'Expected output does not match. Got: {output}'
def test_parse_pdf(tmp_path):
# Create a PDF file with some content
test_pdf_path = tmp_path / 'test.pdf'
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
c = canvas.Canvas(str(test_pdf_path), pagesize=letter)
c.drawString(100, 750, 'Hello, this is a test PDF document.')
c.save()
old_stdout = sys.stdout
sys.stdout = io.StringIO()
# Call the parse_pdf function
parse_pdf(str(test_pdf_path))
# Capture the output
output = sys.stdout.getvalue()
sys.stdout = old_stdout
# Check if the output is correct
expected_output = (
f'[Reading PDF file from {test_pdf_path}]\n'
'@@ Page 1 @@\n'
'Hello, this is a test PDF document.\n'
)
assert output == expected_output, f'Expected output does not match. Got: {output}'
def test_parse_pptx(tmp_path):
test_pptx_path = tmp_path / 'test.pptx'
from pptx import Presentation
pres = Presentation()
slide1 = pres.slides.add_slide(pres.slide_layouts[0])
title1 = slide1.shapes.title
title1.text = 'Hello, this is the first test PPTX slide.'
slide2 = pres.slides.add_slide(pres.slide_layouts[0])
title2 = slide2.shapes.title
title2.text = 'Hello, this is the second test PPTX slide.'
pres.save(str(test_pptx_path))
old_stdout = sys.stdout
sys.stdout = io.StringIO()
parse_pptx(str(test_pptx_path))
output = sys.stdout.getvalue()
sys.stdout = old_stdout
expected_output = (
f'[Reading PowerPoint file from {test_pptx_path}]\n'
'@@ Slide 1 @@\n'
'Hello, this is the first test PPTX slide.\n\n'
'@@ Slide 2 @@\n'
'Hello, this is the second test PPTX slide.\n\n'
)
assert output == expected_output, f'Expected output does not match. Got: {output}'