Files
OpenHands/openhands/runtime/utils/git_handler.py

293 lines
10 KiB
Python

from dataclasses import dataclass
from typing import Callable
from uuid import uuid4
@dataclass
class CommandResult:
"""
Represents the result of a shell command execution.
Attributes:
content (str): The output content of the command.
exit_code (int): The exit code of the command execution.
"""
content: str
exit_code: int
class GitHandler:
"""
A handler for executing Git-related operations via shell commands.
"""
def __init__(
self,
execute_shell_fn: Callable[[str, str | None], CommandResult],
):
self.execute = execute_shell_fn
self.cwd: str | None = None
def set_cwd(self, cwd: str) -> None:
"""
Sets the current working directory for Git operations.
Args:
cwd (str): The directory path.
"""
self.cwd = cwd
def _is_git_repo(self) -> bool:
"""
Checks if the current directory is a Git repository.
Returns:
bool: True if inside a Git repository, otherwise False.
"""
cmd = 'git --no-pager rev-parse --is-inside-work-tree'
output = self.execute(cmd, self.cwd)
return output.content.strip() == 'true'
def _get_current_file_content(self, file_path: str) -> str:
"""
Retrieves the current content of a given file.
Args:
file_path (str): Path to the file.
Returns:
str: The file content.
"""
output = self.execute(f'cat {file_path}', self.cwd)
return output.content
def _verify_ref_exists(self, ref: str) -> bool:
"""
Verifies whether a specific Git reference exists.
Args:
ref (str): The Git reference to check.
Returns:
bool: True if the reference exists, otherwise False.
"""
cmd = f'git --no-pager rev-parse --verify {ref}'
output = self.execute(cmd, self.cwd)
return output.exit_code == 0
def _get_ref_content(self, file_path: str) -> str:
"""
Retrieves the content of a file from a valid Git reference.
Finds the git repository closest to the file in the tree and executes the command in that context.
Args:
file_path (str): The file path in the repository.
Returns:
str: The content of the file from the reference, or an empty string if unavailable.
"""
if not self.cwd:
return ''
unique_id = uuid4().hex
# Single bash command that finds the closest git repository to the file and gets the ref content
cmd = f"""bash -c '
# Convert to absolute path
file_path="$(realpath "{file_path}")"
# Find the closest git repository by walking up the directory tree
current_dir="$(dirname "$file_path")"
git_repo_dir=""
while [[ "$current_dir" != "/" ]]; do
if [[ -d "$current_dir/.git" ]] || git -C "$current_dir" rev-parse --git-dir >/dev/null 2>&1; then
git_repo_dir="$current_dir"
break
fi
current_dir="$(dirname "$current_dir")"
done
# If no git repository found, exit
if [[ -z "$git_repo_dir" ]]; then
exit 1
fi
# Get the file path relative to the git repository root
repo_root="$(cd "$git_repo_dir" && git rev-parse --show-toplevel)"
relative_file_path="${{file_path#${{repo_root}}/}}"
# Function to get current branch
get_current_branch() {{
git -C "$git_repo_dir" rev-parse --abbrev-ref HEAD 2>/dev/null
}}
# Function to get default branch
get_default_branch() {{
git -C "$git_repo_dir" remote show origin 2>/dev/null | grep "HEAD branch" | awk "{{print \\$NF}}" || echo "main"
}}
# Function to verify if a ref exists
verify_ref_exists() {{
git -C "$git_repo_dir" rev-parse --verify "$1" >/dev/null 2>&1
}}
# Get valid reference for comparison
current_branch="$(get_current_branch)"
default_branch="$(get_default_branch)"
# Check if origin remote exists
has_origin="$(git -C "$git_repo_dir" remote | grep -q "^origin$" && echo "true" || echo "false")"
if [[ "$has_origin" == "true" ]]; then
ref_current_branch="origin/$current_branch"
ref_non_default_branch="$(git -C "$git_repo_dir" merge-base HEAD "$(git -C "$git_repo_dir" rev-parse --abbrev-ref origin/$default_branch)" 2>/dev/null || echo "")"
ref_default_branch="origin/$default_branch"
else
# For repositories without origin, try HEAD~1 (previous commit) or empty tree
ref_current_branch="HEAD~1"
ref_non_default_branch=""
ref_default_branch=""
fi
ref_new_repo="$(git -C "$git_repo_dir" rev-parse --verify 4b825dc642cb6eb9a060e54bf8d69288fbee4904 2>/dev/null || echo "")" # empty tree
# Try refs in order of preference
valid_ref=""
for ref in "$ref_current_branch" "$ref_non_default_branch" "$ref_default_branch" "$ref_new_repo"; do
if [[ -n "$ref" ]] && verify_ref_exists "$ref"; then
valid_ref="$ref"
break
fi
done
# If no valid ref found, exit
if [[ -z "$valid_ref" ]]; then
exit 1
fi
# Get the file content from the reference
git -C "$git_repo_dir" show "$valid_ref:$relative_file_path" 2>/dev/null || exit 1
# {unique_id}'"""
result = self.execute(cmd, self.cwd)
if result.exit_code != 0:
return ''
# TODO: The command echoes the bash script. Why?
content = result.content.split(f'{unique_id}')[-1]
return content
def get_git_changes(self) -> list[dict[str, str]] | None:
"""
Retrieves the list of changed files in Git repositories.
Examines each direct subdirectory of the workspace directory looking for git repositories
and returns the changes for each of these directories.
Optimized to use a single git command per repository for maximum performance.
Returns:
list[dict[str, str]] | None: A list of dictionaries containing file paths and statuses. None if no git repositories found.
"""
# If cwd is not set, return None
if not self.cwd:
return None
# Single bash command that:
# 1. Creates a list of directories to check (current dir + direct subdirectories)
# 2. For each directory, checks if it's a git repo and gets status
# 3. Outputs in format: REPO_PATH|STATUS|FILE_PATH
cmd = """bash -c '
{
# Check current directory first
echo "."
# List direct subdirectories (excluding hidden ones)
find . -maxdepth 1 -type d ! -name ".*" ! -name "." 2>/dev/null || true
} | while IFS= read -r dir; do
if [ -d "$dir/.git" ] || git -C "$dir" rev-parse --git-dir >/dev/null 2>&1; then
# Get absolute path of the directory
# Get git status for this repository
git -C "$dir" status --porcelain -uall 2>/dev/null | while IFS= read -r line; do
if [ -n "$line" ]; then
# Extract status (first 2 chars) and file path (from char 3 onwards)
status=$(echo "$line" | cut -c1-2)
file_path=$(echo "$line" | cut -c4-)
# Convert status codes to single character
case "$status" in
"M "*|" M") echo "$dir|M|$file_path" ;;
"A "*|" A") echo "$dir|A|$file_path" ;;
"D "*|" D") echo "$dir|D|$file_path" ;;
"R "*|" R") echo "$dir|R|$file_path" ;;
"C "*|" C") echo "$dir|C|$file_path" ;;
"U "*|" U") echo "$dir|U|$file_path" ;;
"??") echo "$dir|A|$file_path" ;;
*) echo "$dir|M|$file_path" ;;
esac
fi
done
fi
done
' """
result = self.execute(cmd.strip(), self.cwd)
if result.exit_code != 0 or not result.content.strip():
return None
# Parse the output
changes = []
for line in result.content.strip().split('\n'):
if '|' in line:
parts = line.split('|', 2)
if len(parts) == 3:
repo_path, status, file_path = parts
file_path = f'{repo_path}/{file_path}'[2:]
changes.append({'status': status, 'path': file_path})
return changes if changes else None
def get_git_diff(self, file_path: str) -> dict[str, str]:
"""
Retrieves the original and modified content of a file in the repository.
Args:
file_path (str): Path to the file.
Returns:
dict[str, str]: A dictionary containing the original and modified content.
"""
modified = self._get_current_file_content(file_path)
original = self._get_ref_content(file_path)
return {
'modified': modified,
'original': original,
}
def parse_git_changes(changes_list: list[str]) -> list[dict[str, str]]:
"""
Parses the list of changed files and extracts their statuses and paths.
Args:
changes_list (list[str]): List of changed file entries.
Returns:
list[dict[str, str]]: Parsed list of file changes with statuses.
"""
result = []
for line in changes_list:
status = line[:2].strip()
path = line[2:].strip()
# Get the first non-space character as the primary status
primary_status = status.replace(' ', '')[0]
result.append(
{
'status': primary_status,
'path': path,
}
)
return result