Files
AutoGPT/.github/scripts/detect_overlaps.py
Otto aab33c4e14 fix: make overlap check informational only (always green)
- Add continue-on-error: true to workflow
- Remove sys.exit(1) on conflicts
- Check is meant to inform contributors, not block merging
2026-02-13 13:07:45 +00:00

1057 lines
36 KiB
Python

#!/usr/bin/env python3
"""
PR Overlap Detection Tool
Detects potential merge conflicts between a given PR and other open PRs
by checking for file overlap, line overlap, and actual merge conflicts.
"""
import json
import os
import re
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from typing import Optional
# =============================================================================
# MAIN ENTRY POINT
# =============================================================================
def main():
"""Main entry point for PR overlap detection."""
import argparse
parser = argparse.ArgumentParser(description="Detect PR overlaps and potential merge conflicts")
parser.add_argument("pr_number", type=int, help="PR number to check")
parser.add_argument("--base", default=None, help="Base branch (default: auto-detect from PR)")
parser.add_argument("--skip-merge-test", action="store_true", help="Skip actual merge conflict testing")
parser.add_argument("--discord-webhook", default=os.environ.get("DISCORD_WEBHOOK_URL"), help="Discord webhook URL for notifications")
parser.add_argument("--dry-run", action="store_true", help="Don't post comments, just print")
args = parser.parse_args()
owner, repo = get_repo_info()
print(f"Checking PR #{args.pr_number} in {owner}/{repo}")
# Get current PR info
current_pr = fetch_pr_details(args.pr_number)
base_branch = args.base or current_pr.base_ref
print(f"PR #{current_pr.number}: {current_pr.title}")
print(f"Base branch: {base_branch}")
print(f"Files changed: {len(current_pr.files)}")
# Find overlapping PRs
overlaps, all_changes = find_overlapping_prs(
owner, repo, base_branch, current_pr, args.pr_number, args.skip_merge_test
)
if not overlaps:
print("No overlaps detected!")
return
# Generate and post report
comment = format_comment(overlaps, args.pr_number, current_pr.changed_ranges, all_changes)
if args.dry_run:
print("\n" + "="*60)
print("COMMENT PREVIEW:")
print("="*60)
print(comment)
else:
if comment:
post_or_update_comment(args.pr_number, comment)
print("Posted comment to PR")
if args.discord_webhook:
send_discord_notification(args.discord_webhook, current_pr, overlaps)
# Report results and exit
report_results(overlaps)
# =============================================================================
# HIGH-LEVEL WORKFLOW FUNCTIONS
# =============================================================================
def fetch_pr_details(pr_number: int) -> "PullRequest":
"""Fetch details for a specific PR including its diff."""
result = run_gh(["pr", "view", str(pr_number), "--json", "number,title,url,author,headRefName,baseRefName,files"])
data = json.loads(result.stdout)
pr = PullRequest(
number=data["number"],
title=data["title"],
author=data["author"]["login"],
url=data["url"],
head_ref=data["headRefName"],
base_ref=data["baseRefName"],
files=[f["path"] for f in data["files"]],
changed_ranges={}
)
# Get detailed diff
diff = get_pr_diff(pr_number)
pr.changed_ranges = parse_diff_ranges(diff)
return pr
def find_overlapping_prs(
owner: str,
repo: str,
base_branch: str,
current_pr: "PullRequest",
current_pr_number: int,
skip_merge_test: bool
) -> tuple[list["Overlap"], dict[int, dict[str, "ChangedFile"]]]:
"""Find all PRs that overlap with the current PR."""
# Query other open PRs
all_prs = query_open_prs(owner, repo, base_branch)
other_prs = [p for p in all_prs if p["number"] != current_pr_number]
print(f"Found {len(other_prs)} other open PRs targeting {base_branch}")
# Find file overlaps (excluding ignored files)
candidates = find_file_overlap_candidates(current_pr.files, other_prs)
print(f"Found {len(candidates)} PRs with file overlap (excluding ignored files)")
if not candidates:
return [], {}
# Analyze each candidate for line overlaps and conflicts
overlaps = []
all_changes = {}
for pr_data, shared_files in candidates:
overlap, pr_changes = analyze_pr_overlap(
owner, repo, base_branch, current_pr, pr_data, shared_files, skip_merge_test
)
if overlap:
overlaps.append(overlap)
all_changes[pr_data["number"]] = pr_changes
return overlaps, all_changes
def analyze_pr_overlap(
owner: str,
repo: str,
base_branch: str,
current_pr: "PullRequest",
other_pr_data: dict,
shared_files: list[str],
skip_merge_test: bool
) -> tuple[Optional["Overlap"], dict[str, "ChangedFile"]]:
"""Analyze overlap between current PR and another PR."""
# Filter out ignored files
non_ignored_shared = [f for f in shared_files if not should_ignore_file(f)]
if not non_ignored_shared:
return None, {}
other_pr = PullRequest(
number=other_pr_data["number"],
title=other_pr_data["title"],
author=other_pr_data["author"],
url=other_pr_data["url"],
head_ref=other_pr_data["head_ref"],
base_ref=other_pr_data["base_ref"],
files=other_pr_data["files"],
changed_ranges={},
updated_at=other_pr_data.get("updated_at")
)
# Get diff for other PR
other_diff = get_pr_diff(other_pr.number)
other_pr.changed_ranges = parse_diff_ranges(other_diff)
# Check line overlaps
line_overlaps = find_line_overlaps(
current_pr.changed_ranges,
other_pr.changed_ranges,
shared_files
)
overlap = Overlap(
pr_a=current_pr,
pr_b=other_pr,
overlapping_files=non_ignored_shared,
line_overlaps=line_overlaps
)
# Test for actual merge conflicts if we have line overlaps
if line_overlaps and not skip_merge_test:
print(f"Testing merge conflict with PR #{other_pr.number}...", flush=True)
has_conflict, conflict_files, conflict_details, error_type = test_merge_conflict(
owner, repo, base_branch, current_pr, other_pr
)
overlap.has_merge_conflict = has_conflict
overlap.conflict_files = conflict_files
overlap.conflict_details = conflict_details
overlap.conflict_type = error_type
return overlap, other_pr.changed_ranges
def find_file_overlap_candidates(
current_files: list[str],
other_prs: list[dict]
) -> list[tuple[dict, list[str]]]:
"""Find PRs that share files with the current PR."""
current_files_set = set(f for f in current_files if not should_ignore_file(f))
candidates = []
for pr_data in other_prs:
other_files = set(f for f in pr_data["files"] if not should_ignore_file(f))
shared = current_files_set & other_files
if shared:
candidates.append((pr_data, list(shared)))
return candidates
def report_results(overlaps: list["Overlap"]):
"""Report results (informational only, always exits 0)."""
conflicts = [o for o in overlaps if o.has_merge_conflict]
if conflicts:
print(f"\n⚠️ Found {len(conflicts)} merge conflict(s)")
line_overlap_count = len([o for o in overlaps if o.line_overlaps])
if line_overlap_count:
print(f"\n⚠️ Found {line_overlap_count} PR(s) with line overlap")
print("\n✅ Done")
# Always exit 0 - this check is informational, not a merge blocker
# =============================================================================
# COMMENT FORMATTING
# =============================================================================
def format_comment(
overlaps: list["Overlap"],
current_pr: int,
changes_current: dict[str, "ChangedFile"],
all_changes: dict[int, dict[str, "ChangedFile"]]
) -> str:
"""Format the overlap report as a PR comment."""
if not overlaps:
return ""
lines = ["## 🔍 PR Overlap Detection"]
lines.append("")
lines.append("This check compares your PR against all other open PRs targeting the same branch to detect potential merge conflicts early.")
lines.append("")
# Check if current PR conflicts with base branch
format_base_conflicts(overlaps, lines)
# Classify and sort overlaps
classified = classify_all_overlaps(overlaps, current_pr, changes_current, all_changes)
# Group by risk
conflicts = [(o, r) for o, r in classified if r == 'conflict']
high_risk = [(o, r) for o, r in classified if r == 'high']
medium_risk = [(o, r) for o, r in classified if r == 'medium']
low_risk = [(o, r) for o, r in classified if r == 'low']
# Format each section
format_conflicts_section(conflicts, current_pr, lines)
format_high_risk_section(high_risk, current_pr, lines)
format_medium_risk_section(medium_risk, current_pr, changes_current, all_changes, lines)
format_low_risk_section(low_risk, current_pr, lines)
# Summary
total = len(overlaps)
lines.append(f"\n**Summary:** {len(conflicts)} conflicts, {len(high_risk)} high risk, {len(medium_risk)} medium risk, {len(low_risk)} low risk (out of {total} PRs with file overlap)")
lines.append("\n---\n*Auto-generated on push. Ignores: `openapi.json`, lock files.*")
return "\n".join(lines)
def format_base_conflicts(overlaps: list["Overlap"], lines: list[str]):
"""Format base branch conflicts section."""
base_conflicts = [o for o in overlaps if o.conflict_type == 'pr_a_conflicts_base']
if base_conflicts:
lines.append("### ⚠️ This PR has conflicts with the base branch\n")
lines.append("Conflicts will need to be resolved before merging:\n")
first = base_conflicts[0]
for f in first.conflict_files[:10]:
lines.append(f"- `{f}`")
if len(first.conflict_files) > 10:
lines.append(f"- ... and {len(first.conflict_files) - 10} more files")
lines.append("\n")
def format_conflicts_section(conflicts: list[tuple], current_pr: int, lines: list[str]):
"""Format the merge conflicts section."""
pr_conflicts = [(o, r) for o, r in conflicts if o.conflict_type != 'pr_a_conflicts_base']
if not pr_conflicts:
return
lines.append("### 🔴 Merge Conflicts Detected")
lines.append("")
lines.append("The following PRs have been tested and **will have merge conflicts** if merged after this PR. Consider coordinating with the authors.")
lines.append("")
for o, _ in pr_conflicts:
other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
format_pr_entry(other, lines)
format_conflict_details(o, lines)
lines.append("")
def format_high_risk_section(high_risk: list[tuple], current_pr: int, lines: list[str]):
"""Format the high risk section."""
if not high_risk:
return
lines.append("### 🟠 High Risk — Significant Line Overlap")
lines.append("")
lines.append("These PRs modify many of the same lines (>20 lines). While not yet tested for conflicts, they have high potential to conflict.")
lines.append("")
for o, _ in high_risk:
other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
format_pr_entry(other, lines)
format_line_overlaps(o.line_overlaps, lines)
lines.append("")
def format_medium_risk_section(
medium_risk: list[tuple],
current_pr: int,
changes_current: dict,
all_changes: dict,
lines: list[str]
):
"""Format the medium risk section."""
if not medium_risk:
return
lines.append("### 🟡 Medium Risk — Some Line Overlap\n")
lines.append("These PRs have some overlapping changes:\n")
for o, _ in medium_risk:
other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
other_changes = all_changes.get(other.number, {})
format_pr_entry(other, lines)
# Note if rename is involved
for file_path in o.overlapping_files:
file_a = changes_current.get(file_path)
file_b = other_changes.get(file_path)
if (file_a and file_a.is_rename) or (file_b and file_b.is_rename):
lines.append(f" - ⚠️ `{file_path}` is being renamed/moved")
break
if o.line_overlaps:
for file_path, ranges in o.line_overlaps.items():
range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
else:
non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
if non_ignored:
lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
lines.append("")
def format_low_risk_section(low_risk: list[tuple], current_pr: int, lines: list[str]):
"""Format the low risk section."""
if not low_risk:
return
lines.append("### 🟢 Low Risk — File Overlap Only\n")
lines.append("<details><summary>These PRs touch the same files but different sections (click to expand)</summary>\n")
for o, _ in low_risk:
other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
if non_ignored:
format_pr_entry(other, lines)
if o.line_overlaps:
for file_path, ranges in o.line_overlaps.items():
range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
else:
lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
lines.append("\n</details>\n")
def format_pr_entry(pr: "PullRequest", lines: list[str]):
"""Format a single PR entry line."""
updated = format_relative_time(pr.updated_at)
updated_str = f" · updated {updated}" if updated else ""
lines.append(f"- **#{pr.number}** ({pr.author}{updated_str}): [{pr.title}]({pr.url})")
def format_conflict_details(overlap: "Overlap", lines: list[str]):
"""Format conflict details for a PR."""
if overlap.conflict_details:
all_paths = [d.path for d in overlap.conflict_details]
common_prefix = find_common_prefix(all_paths)
if common_prefix:
lines.append(f" - 📁 `{common_prefix}`")
for detail in overlap.conflict_details:
display_path = detail.path[len(common_prefix):] if common_prefix else detail.path
size_str = format_conflict_size(detail)
lines.append(f" - `{display_path}`{size_str}")
elif overlap.conflict_files:
common_prefix = find_common_prefix(overlap.conflict_files)
if common_prefix:
lines.append(f" - 📁 `{common_prefix}`")
for f in overlap.conflict_files:
display_path = f[len(common_prefix):] if common_prefix else f
lines.append(f" - `{display_path}`")
def format_conflict_size(detail: "ConflictInfo") -> str:
"""Format conflict size string for a file."""
if detail.conflict_count > 0:
return f" ({detail.conflict_count} conflict{'s' if detail.conflict_count > 1 else ''}, ~{detail.conflict_lines} lines)"
elif detail.conflict_type != 'content':
type_labels = {
'both_added': 'added in both',
'both_deleted': 'deleted in both',
'deleted_by_us': 'deleted here, modified there',
'deleted_by_them': 'modified here, deleted there',
'added_by_us': 'added here',
'added_by_them': 'added there',
}
label = type_labels.get(detail.conflict_type, detail.conflict_type)
return f" ({label})"
return ""
def format_line_overlaps(line_overlaps: dict[str, list[tuple]], lines: list[str]):
"""Format line overlap details."""
all_paths = list(line_overlaps.keys())
common_prefix = find_common_prefix(all_paths) if len(all_paths) > 1 else ""
if common_prefix:
lines.append(f" - 📁 `{common_prefix}`")
for file_path, ranges in line_overlaps.items():
display_path = file_path[len(common_prefix):] if common_prefix else file_path
range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
indent = " " if common_prefix else " "
lines.append(f"{indent}- `{display_path}`: {', '.join(range_strs)}")
# =============================================================================
# OVERLAP ANALYSIS
# =============================================================================
def classify_all_overlaps(
overlaps: list["Overlap"],
current_pr: int,
changes_current: dict,
all_changes: dict
) -> list[tuple["Overlap", str]]:
"""Classify all overlaps by risk level and sort them."""
classified = []
for o in overlaps:
other_pr = o.pr_b if o.pr_a.number == current_pr else o.pr_a
other_changes = all_changes.get(other_pr.number, {})
risk = classify_overlap_risk(o, changes_current, other_changes)
classified.append((o, risk))
risk_order = {'conflict': 0, 'high': 1, 'medium': 2, 'low': 3}
classified.sort(key=lambda x: risk_order.get(x[1], 99))
return classified
def classify_overlap_risk(
overlap: "Overlap",
changes_a: dict[str, "ChangedFile"],
changes_b: dict[str, "ChangedFile"]
) -> str:
"""Classify the risk level of an overlap."""
if overlap.has_merge_conflict:
return 'conflict'
has_rename = any(
(changes_a.get(f) and changes_a[f].is_rename) or
(changes_b.get(f) and changes_b[f].is_rename)
for f in overlap.overlapping_files
)
if overlap.line_overlaps:
total_overlap_lines = sum(
end - start + 1
for ranges in overlap.line_overlaps.values()
for start, end in ranges
)
if total_overlap_lines > 20:
return 'high'
elif total_overlap_lines > 5:
return 'medium'
else:
return 'low'
if has_rename:
return 'medium'
return 'low'
def find_line_overlaps(
changes_a: dict[str, "ChangedFile"],
changes_b: dict[str, "ChangedFile"],
shared_files: list[str]
) -> dict[str, list[tuple[int, int]]]:
"""Find overlapping line ranges in shared files."""
overlaps = {}
for file_path in shared_files:
if should_ignore_file(file_path):
continue
file_a = changes_a.get(file_path)
file_b = changes_b.get(file_path)
if not file_a or not file_b:
continue
# Skip pure renames
if file_a.is_rename and not file_a.additions and not file_a.deletions:
continue
if file_b.is_rename and not file_b.additions and not file_b.deletions:
continue
file_overlaps = find_range_overlaps(
file_a.additions + file_a.deletions,
file_b.additions + file_b.deletions
)
if file_overlaps:
overlaps[file_path] = merge_ranges(file_overlaps)
return overlaps
def find_range_overlaps(
ranges_a: list[tuple[int, int]],
ranges_b: list[tuple[int, int]]
) -> list[tuple[int, int]]:
"""Find overlapping regions between two sets of ranges."""
overlaps = []
for range_a in ranges_a:
for range_b in ranges_b:
if ranges_overlap(range_a, range_b):
overlap_start = max(range_a[0], range_b[0])
overlap_end = min(range_a[1], range_b[1])
overlaps.append((overlap_start, overlap_end))
return overlaps
def ranges_overlap(range_a: tuple[int, int], range_b: tuple[int, int]) -> bool:
"""Check if two line ranges overlap."""
return range_a[0] <= range_b[1] and range_b[0] <= range_a[1]
def merge_ranges(ranges: list[tuple[int, int]]) -> list[tuple[int, int]]:
"""Merge overlapping line ranges."""
if not ranges:
return []
sorted_ranges = sorted(ranges, key=lambda x: x[0])
merged = [sorted_ranges[0]]
for current in sorted_ranges[1:]:
last = merged[-1]
if current[0] <= last[1] + 1:
merged[-1] = (last[0], max(last[1], current[1]))
else:
merged.append(current)
return merged
# =============================================================================
# MERGE CONFLICT TESTING
# =============================================================================
def test_merge_conflict(
owner: str,
repo: str,
base_branch: str,
pr_a: "PullRequest",
pr_b: "PullRequest"
) -> tuple[bool, list[str], list["ConflictInfo"], str]:
"""Test if merging both PRs would cause a conflict."""
with tempfile.TemporaryDirectory() as tmpdir:
# Clone repo
if not clone_repo(owner, repo, base_branch, tmpdir):
return False, [], [], None
configure_git(tmpdir)
fetch_pr_branches(tmpdir, pr_a.number, pr_b.number)
# Try merging PR A first
conflict_result = try_merge_pr(tmpdir, pr_a.number)
if conflict_result:
return True, conflict_result[0], conflict_result[1], 'pr_a_conflicts_base'
# Commit and try merging PR B
run_git(["commit", "-m", f"Merge PR #{pr_a.number}"], cwd=tmpdir, check=False)
conflict_result = try_merge_pr(tmpdir, pr_b.number)
if conflict_result:
return True, conflict_result[0], conflict_result[1], 'conflict'
return False, [], [], None
def clone_repo(owner: str, repo: str, branch: str, tmpdir: str) -> bool:
"""Clone the repository."""
clone_url = f"https://github.com/{owner}/{repo}.git"
result = run_git(
["clone", "--depth=50", "--branch", branch, clone_url, tmpdir],
check=False
)
if result.returncode != 0:
print(f"Failed to clone: {result.stderr}", file=sys.stderr)
return False
return True
def configure_git(tmpdir: str):
"""Configure git for commits."""
run_git(["config", "user.email", "github-actions[bot]@users.noreply.github.com"], cwd=tmpdir, check=False)
run_git(["config", "user.name", "github-actions[bot]"], cwd=tmpdir, check=False)
def fetch_pr_branches(tmpdir: str, pr_a: int, pr_b: int):
"""Fetch both PR branches."""
run_git(["fetch", "origin", f"pull/{pr_a}/head:pr-{pr_a}"], cwd=tmpdir, check=False)
run_git(["fetch", "origin", f"pull/{pr_b}/head:pr-{pr_b}"], cwd=tmpdir, check=False)
def try_merge_pr(tmpdir: str, pr_number: int) -> Optional[tuple[list[str], list["ConflictInfo"]]]:
"""Try to merge a PR. Returns conflict info if conflicts, None if success."""
result = run_git(["merge", "--no-commit", "--no-ff", f"pr-{pr_number}"], cwd=tmpdir, check=False)
if result.returncode == 0:
return None
# Conflict detected
conflict_files, conflict_details = extract_conflict_info(tmpdir, result.stderr)
run_git(["merge", "--abort"], cwd=tmpdir, check=False)
return conflict_files, conflict_details
def extract_conflict_info(tmpdir: str, stderr: str) -> tuple[list[str], list["ConflictInfo"]]:
"""Extract conflict information from git status."""
status_result = run_git(["status", "--porcelain"], cwd=tmpdir, check=False)
status_types = {
'UU': 'content',
'AA': 'both_added',
'DD': 'both_deleted',
'DU': 'deleted_by_us',
'UD': 'deleted_by_them',
'AU': 'added_by_us',
'UA': 'added_by_them',
}
conflict_files = []
conflict_details = []
for line in status_result.stdout.split("\n"):
if len(line) >= 3 and line[0:2] in status_types:
status_code = line[0:2]
file_path = line[3:].strip()
conflict_files.append(file_path)
info = analyze_conflict_markers(file_path, tmpdir)
info.conflict_type = status_types.get(status_code, 'unknown')
conflict_details.append(info)
# Fallback to stderr parsing
if not conflict_files and stderr:
for line in stderr.split("\n"):
if "CONFLICT" in line and ":" in line:
parts = line.split(":")
if len(parts) > 1:
file_part = parts[-1].strip()
if file_part and not file_part.startswith("Merge"):
conflict_files.append(file_part)
conflict_details.append(ConflictInfo(path=file_part))
return conflict_files, conflict_details
def analyze_conflict_markers(file_path: str, cwd: str) -> "ConflictInfo":
"""Analyze a conflicted file to count conflict regions and lines."""
info = ConflictInfo(path=file_path)
try:
full_path = os.path.join(cwd, file_path)
with open(full_path, 'r', errors='ignore') as f:
content = f.read()
in_conflict = False
current_conflict_lines = 0
for line in content.split('\n'):
if line.startswith('<<<<<<<'):
in_conflict = True
info.conflict_count += 1
current_conflict_lines = 1
elif line.startswith('>>>>>>>'):
in_conflict = False
current_conflict_lines += 1
info.conflict_lines += current_conflict_lines
elif in_conflict:
current_conflict_lines += 1
except:
pass
return info
# =============================================================================
# DIFF PARSING
# =============================================================================
def parse_diff_ranges(diff: str) -> dict[str, "ChangedFile"]:
"""Parse a unified diff and extract changed line ranges per file."""
files = {}
current_file = None
pending_rename_from = None
is_rename = False
for line in diff.split("\n"):
if line.startswith("rename from "):
pending_rename_from = line[12:]
is_rename = True
elif line.startswith("rename to "):
pass # Just tracking rename
elif line.startswith("similarity index"):
is_rename = True
elif line.startswith("+++ b/"):
path = line[6:]
current_file = ChangedFile(
path=path,
additions=[],
deletions=[],
is_rename=is_rename,
old_path=pending_rename_from
)
files[path] = current_file
pending_rename_from = None
is_rename = False
elif line.startswith("--- /dev/null"):
is_rename = False
pending_rename_from = None
elif line.startswith("@@") and current_file:
parse_hunk_header(line, current_file)
return files
def parse_hunk_header(line: str, current_file: "ChangedFile"):
"""Parse a diff hunk header and add ranges to the file."""
match = re.match(r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line)
if match:
old_start = int(match.group(1))
old_count = int(match.group(2) or 1)
new_start = int(match.group(3))
new_count = int(match.group(4) or 1)
if old_count > 0:
current_file.deletions.append((old_start, old_start + old_count - 1))
if new_count > 0:
current_file.additions.append((new_start, new_start + new_count - 1))
# =============================================================================
# GITHUB API
# =============================================================================
def get_repo_info() -> tuple[str, str]:
"""Get owner and repo name from environment or git."""
if os.environ.get("GITHUB_REPOSITORY"):
owner, repo = os.environ["GITHUB_REPOSITORY"].split("/")
return owner, repo
result = run_gh(["repo", "view", "--json", "owner,name"])
data = json.loads(result.stdout)
return data["owner"]["login"], data["name"]
def query_open_prs(owner: str, repo: str, base_branch: str) -> list[dict]:
"""Query all open PRs targeting the specified base branch."""
prs = []
cursor = None
while True:
after_clause = f', after: "{cursor}"' if cursor else ""
query = f'''
query {{
repository(owner: "{owner}", name: "{repo}") {{
pullRequests(
first: 100{after_clause},
states: OPEN,
baseRefName: "{base_branch}",
orderBy: {{field: UPDATED_AT, direction: DESC}}
) {{
totalCount
edges {{
node {{
number
title
url
updatedAt
author {{ login }}
headRefName
baseRefName
files(first: 100) {{
nodes {{ path }}
}}
}}
}}
pageInfo {{
endCursor
hasNextPage
}}
}}
}}
}}
'''
result = run_gh(["api", "graphql", "-f", f"query={query}"])
data = json.loads(result.stdout)
if "errors" in data:
print(f"GraphQL errors: {data['errors']}", file=sys.stderr)
sys.exit(1)
pr_data = data["data"]["repository"]["pullRequests"]
for edge in pr_data["edges"]:
node = edge["node"]
prs.append({
"number": node["number"],
"title": node["title"],
"url": node["url"],
"updated_at": node.get("updatedAt"),
"author": node["author"]["login"] if node["author"] else "unknown",
"head_ref": node["headRefName"],
"base_ref": node["baseRefName"],
"files": [f["path"] for f in node["files"]["nodes"]]
})
if not pr_data["pageInfo"]["hasNextPage"]:
break
cursor = pr_data["pageInfo"]["endCursor"]
return prs
def get_pr_diff(pr_number: int) -> str:
"""Get the diff for a PR."""
result = run_gh(["pr", "diff", str(pr_number)])
return result.stdout
def post_or_update_comment(pr_number: int, body: str):
"""Post a new comment or update existing overlap detection comment."""
if body:
run_gh(["pr", "comment", str(pr_number), "--body", body])
def send_discord_notification(webhook_url: str, pr: "PullRequest", overlaps: list["Overlap"]):
"""Send a Discord notification about significant overlaps."""
conflicts = [o for o in overlaps if o.has_merge_conflict]
if not conflicts:
return
embed = {
"title": f"⚠️ PR #{pr.number} has merge conflicts",
"description": f"[{pr.title}]({pr.url})",
"color": 0xFF0000,
"fields": [
{
"name": f"Conflicts with #{(o.pr_b if o.pr_a.number == pr.number else o.pr_a).number}",
"value": f"[{(o.pr_b if o.pr_a.number == pr.number else o.pr_a).title}]({(o.pr_b if o.pr_a.number == pr.number else o.pr_a).url})\nFiles: `{'`, `'.join(o.conflict_files[:3])}`",
"inline": False
}
for o in conflicts
]
}
subprocess.run(
["curl", "-X", "POST", "-H", "Content-Type: application/json",
"-d", json.dumps({"embeds": [embed]}), webhook_url],
capture_output=True
)
# =============================================================================
# UTILITIES
# =============================================================================
def run_gh(args: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run a gh CLI command."""
result = subprocess.run(
["gh"] + args,
capture_output=True,
text=True,
check=False
)
if check and result.returncode != 0:
print(f"Error running gh {' '.join(args)}: {result.stderr}", file=sys.stderr)
sys.exit(1)
return result
def run_git(args: list[str], cwd: str = None, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command."""
result = subprocess.run(
["git"] + args,
capture_output=True,
text=True,
cwd=cwd,
check=False
)
if check and result.returncode != 0:
print(f"Error running git {' '.join(args)}: {result.stderr}", file=sys.stderr)
return result
def should_ignore_file(path: str) -> bool:
"""Check if a file should be ignored for overlap detection."""
if path in IGNORE_FILES:
return True
basename = path.split("/")[-1]
return basename in IGNORE_FILES
def find_common_prefix(paths: list[str]) -> str:
"""Find the common directory prefix of a list of file paths."""
if not paths:
return ""
if len(paths) == 1:
parts = paths[0].rsplit('/', 1)
return parts[0] + '/' if len(parts) > 1 else ""
split_paths = [p.split('/') for p in paths]
common = []
for parts in zip(*split_paths):
if len(set(parts)) == 1:
common.append(parts[0])
else:
break
return '/'.join(common) + '/' if common else ""
def format_relative_time(iso_timestamp: str) -> str:
"""Format an ISO timestamp as relative time."""
if not iso_timestamp:
return ""
from datetime import datetime, timezone
try:
dt = datetime.fromisoformat(iso_timestamp.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
diff = now - dt
seconds = diff.total_seconds()
if seconds < 60:
return "just now"
elif seconds < 3600:
return f"{int(seconds / 60)}m ago"
elif seconds < 86400:
return f"{int(seconds / 3600)}h ago"
else:
return f"{int(seconds / 86400)}d ago"
except:
return ""
# =============================================================================
# DATA CLASSES
# =============================================================================
@dataclass
class ChangedFile:
"""Represents a file changed in a PR."""
path: str
additions: list[tuple[int, int]]
deletions: list[tuple[int, int]]
is_rename: bool = False
old_path: str = None
@dataclass
class PullRequest:
"""Represents a pull request."""
number: int
title: str
author: str
url: str
head_ref: str
base_ref: str
files: list[str]
changed_ranges: dict[str, ChangedFile]
updated_at: str = None
@dataclass
class ConflictInfo:
"""Info about a single conflicting file."""
path: str
conflict_count: int = 0
conflict_lines: int = 0
conflict_type: str = "content"
@dataclass
class Overlap:
"""Represents an overlap between two PRs."""
pr_a: PullRequest
pr_b: PullRequest
overlapping_files: list[str]
line_overlaps: dict[str, list[tuple[int, int]]]
has_merge_conflict: bool = False
conflict_files: list[str] = None
conflict_details: list[ConflictInfo] = None
conflict_type: str = None
def __post_init__(self):
if self.conflict_files is None:
self.conflict_files = []
if self.conflict_details is None:
self.conflict_details = []
# =============================================================================
# CONSTANTS
# =============================================================================
IGNORE_FILES = {
"autogpt_platform/frontend/src/app/api/openapi.json",
"poetry.lock",
"pnpm-lock.yaml",
"package-lock.json",
"yarn.lock",
}
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
main()