diff --git a/.github/scripts/detect_overlaps.py b/.github/scripts/detect_overlaps.py
index eab2b856ec..8d8e5f4392 100644
--- a/.github/scripts/detect_overlaps.py
+++ b/.github/scripts/detect_overlaps.py
@@ -16,102 +16,774 @@ from dataclasses import dataclass
from typing import Optional
-@dataclass
-class ChangedFile:
- path: str
- additions: list[tuple[int, int]] # List of (start_line, end_line) ranges
- deletions: list[tuple[int, int]]
- is_rename: bool = False
- old_path: str = None
+# =============================================================================
+# MAIN ENTRY POINT
+# =============================================================================
+
+def main():
+ """Main entry point for PR overlap detection."""
+ import argparse
+
+ parser = argparse.ArgumentParser(description="Detect PR overlaps and potential merge conflicts")
+ parser.add_argument("pr_number", type=int, help="PR number to check")
+ parser.add_argument("--base", default=None, help="Base branch (default: auto-detect from PR)")
+ parser.add_argument("--skip-merge-test", action="store_true", help="Skip actual merge conflict testing")
+ parser.add_argument("--discord-webhook", default=os.environ.get("DISCORD_WEBHOOK_URL"), help="Discord webhook URL for notifications")
+ parser.add_argument("--dry-run", action="store_true", help="Don't post comments, just print")
+
+ args = parser.parse_args()
+
+ owner, repo = get_repo_info()
+ print(f"Checking PR #{args.pr_number} in {owner}/{repo}")
+
+ # Get current PR info
+ current_pr = fetch_pr_details(args.pr_number)
+ base_branch = args.base or current_pr.base_ref
+
+ print(f"PR #{current_pr.number}: {current_pr.title}")
+ print(f"Base branch: {base_branch}")
+ print(f"Files changed: {len(current_pr.files)}")
+
+ # Find overlapping PRs
+ overlaps, all_changes = find_overlapping_prs(
+ owner, repo, base_branch, current_pr, args.pr_number, args.skip_merge_test
+ )
+
+ if not overlaps:
+ print("No overlaps detected!")
+ return
+
+ # Generate and post report
+ comment = format_comment(overlaps, args.pr_number, current_pr.changed_ranges, all_changes)
+
+ if args.dry_run:
+ print("\n" + "="*60)
+ print("COMMENT PREVIEW:")
+ print("="*60)
+ print(comment)
+ else:
+ if comment:
+ post_or_update_comment(args.pr_number, comment)
+ print("Posted comment to PR")
+
+ if args.discord_webhook:
+ send_discord_notification(args.discord_webhook, current_pr, overlaps)
+
+ # Report results and exit
+ report_results(overlaps)
-# Files that are auto-generated or rarely cause real conflicts
-IGNORE_FILES = {
- "autogpt_platform/frontend/src/app/api/openapi.json", # Auto-generated from backend
- "poetry.lock", # Lock file, conflicts are usually trivial
- "pnpm-lock.yaml",
- "package-lock.json",
- "yarn.lock",
-}
+# =============================================================================
+# HIGH-LEVEL WORKFLOW FUNCTIONS
+# =============================================================================
+
+def fetch_pr_details(pr_number: int) -> "PullRequest":
+ """Fetch details for a specific PR including its diff."""
+ result = run_gh(["pr", "view", str(pr_number), "--json", "number,title,url,author,headRefName,baseRefName,files"])
+ data = json.loads(result.stdout)
+
+ pr = PullRequest(
+ number=data["number"],
+ title=data["title"],
+ author=data["author"]["login"],
+ url=data["url"],
+ head_ref=data["headRefName"],
+ base_ref=data["baseRefName"],
+ files=[f["path"] for f in data["files"]],
+ changed_ranges={}
+ )
+
+ # Get detailed diff
+ diff = get_pr_diff(pr_number)
+ pr.changed_ranges = parse_diff_ranges(diff)
+
+ return pr
-@dataclass
-class PullRequest:
- number: int
- title: str
- author: str
- url: str
- head_ref: str
- base_ref: str
- files: list[str]
- changed_ranges: dict[str, ChangedFile] # path -> ChangedFile
- updated_at: str = None # ISO timestamp
+def find_overlapping_prs(
+ owner: str,
+ repo: str,
+ base_branch: str,
+ current_pr: "PullRequest",
+ current_pr_number: int,
+ skip_merge_test: bool
+) -> tuple[list["Overlap"], dict[int, dict[str, "ChangedFile"]]]:
+ """Find all PRs that overlap with the current PR."""
+ # Query other open PRs
+ all_prs = query_open_prs(owner, repo, base_branch)
+ other_prs = [p for p in all_prs if p["number"] != current_pr_number]
+
+ print(f"Found {len(other_prs)} other open PRs targeting {base_branch}")
+
+ # Find file overlaps (excluding ignored files)
+ candidates = find_file_overlap_candidates(current_pr.files, other_prs)
+
+ print(f"Found {len(candidates)} PRs with file overlap (excluding ignored files)")
+
+ if not candidates:
+ return [], {}
+
+ # Analyze each candidate for line overlaps and conflicts
+ overlaps = []
+ all_changes = {}
+
+ for pr_data, shared_files in candidates:
+ overlap, pr_changes = analyze_pr_overlap(
+ owner, repo, base_branch, current_pr, pr_data, shared_files, skip_merge_test
+ )
+ if overlap:
+ overlaps.append(overlap)
+ all_changes[pr_data["number"]] = pr_changes
+
+ return overlaps, all_changes
-@dataclass
-class ConflictInfo:
- """Info about a single conflicting file."""
- path: str
- conflict_count: int = 0 # Number of conflict regions
- conflict_lines: int = 0 # Total lines in conflict regions
- conflict_type: str = "content" # content, added, deleted, renamed, binary
+def analyze_pr_overlap(
+ owner: str,
+ repo: str,
+ base_branch: str,
+ current_pr: "PullRequest",
+ other_pr_data: dict,
+ shared_files: list[str],
+ skip_merge_test: bool
+) -> tuple[Optional["Overlap"], dict[str, "ChangedFile"]]:
+ """Analyze overlap between current PR and another PR."""
+ # Filter out ignored files
+ non_ignored_shared = [f for f in shared_files if not should_ignore_file(f)]
+ if not non_ignored_shared:
+ return None, {}
+
+ other_pr = PullRequest(
+ number=other_pr_data["number"],
+ title=other_pr_data["title"],
+ author=other_pr_data["author"],
+ url=other_pr_data["url"],
+ head_ref=other_pr_data["head_ref"],
+ base_ref=other_pr_data["base_ref"],
+ files=other_pr_data["files"],
+ changed_ranges={},
+ updated_at=other_pr_data.get("updated_at")
+ )
+
+ # Get diff for other PR
+ other_diff = get_pr_diff(other_pr.number)
+ other_pr.changed_ranges = parse_diff_ranges(other_diff)
+
+ # Check line overlaps
+ line_overlaps = find_line_overlaps(
+ current_pr.changed_ranges,
+ other_pr.changed_ranges,
+ shared_files
+ )
+
+ overlap = Overlap(
+ pr_a=current_pr,
+ pr_b=other_pr,
+ overlapping_files=non_ignored_shared,
+ line_overlaps=line_overlaps
+ )
+
+ # Test for actual merge conflicts if we have line overlaps
+ if line_overlaps and not skip_merge_test:
+ print(f"Testing merge conflict with PR #{other_pr.number}...", flush=True)
+ has_conflict, conflict_files, conflict_details, error_type = test_merge_conflict(
+ owner, repo, base_branch, current_pr, other_pr
+ )
+ overlap.has_merge_conflict = has_conflict
+ overlap.conflict_files = conflict_files
+ overlap.conflict_details = conflict_details
+ overlap.conflict_type = error_type
+
+ return overlap, other_pr.changed_ranges
-@dataclass
-class Overlap:
- pr_a: PullRequest
- pr_b: PullRequest
- overlapping_files: list[str]
- line_overlaps: dict[str, list[tuple[int, int]]] # file -> overlapping line ranges
- has_merge_conflict: bool = False
- conflict_files: list[str] = None
- conflict_details: list[ConflictInfo] = None # Detailed conflict info per file
- conflict_type: str = None # None, 'pr_a_conflicts_base', 'conflict'
-
- def __post_init__(self):
- if self.conflict_files is None:
- self.conflict_files = []
- if self.conflict_details is None:
- self.conflict_details = []
+def find_file_overlap_candidates(
+ current_files: list[str],
+ other_prs: list[dict]
+) -> list[tuple[dict, list[str]]]:
+ """Find PRs that share files with the current PR."""
+ current_files_set = set(f for f in current_files if not should_ignore_file(f))
+ candidates = []
+
+ for pr_data in other_prs:
+ other_files = set(f for f in pr_data["files"] if not should_ignore_file(f))
+ shared = current_files_set & other_files
+
+ if shared:
+ candidates.append((pr_data, list(shared)))
+
+ return candidates
-def run_gh(args: list[str], check: bool = True) -> subprocess.CompletedProcess:
- """Run a gh CLI command."""
- result = subprocess.run(
- ["gh"] + args,
- capture_output=True,
- text=True,
+def report_results(overlaps: list["Overlap"]):
+ """Report results and exit with appropriate code."""
+ conflicts = [o for o in overlaps if o.has_merge_conflict]
+ if conflicts:
+ print(f"\n⚠️ Found {len(conflicts)} merge conflict(s)")
+ sys.exit(1)
+
+ line_overlap_count = len([o for o in overlaps if o.line_overlaps])
+ if line_overlap_count:
+ print(f"\n⚠️ Found {line_overlap_count} PR(s) with line overlap")
+
+ print("\n✅ Done")
+
+
+# =============================================================================
+# COMMENT FORMATTING
+# =============================================================================
+
+def format_comment(
+ overlaps: list["Overlap"],
+ current_pr: int,
+ changes_current: dict[str, "ChangedFile"],
+ all_changes: dict[int, dict[str, "ChangedFile"]]
+) -> str:
+ """Format the overlap report as a PR comment."""
+ if not overlaps:
+ return ""
+
+ lines = ["## 🔍 PR Overlap Detection"]
+ lines.append("")
+ lines.append("This check compares your PR against all other open PRs targeting the same branch to detect potential merge conflicts early.")
+ lines.append("")
+
+ # Check if current PR conflicts with base branch
+ format_base_conflicts(overlaps, lines)
+
+ # Classify and sort overlaps
+ classified = classify_all_overlaps(overlaps, current_pr, changes_current, all_changes)
+
+ # Group by risk
+ conflicts = [(o, r) for o, r in classified if r == 'conflict']
+ high_risk = [(o, r) for o, r in classified if r == 'high']
+ medium_risk = [(o, r) for o, r in classified if r == 'medium']
+ low_risk = [(o, r) for o, r in classified if r == 'low']
+
+ # Format each section
+ format_conflicts_section(conflicts, current_pr, lines)
+ format_high_risk_section(high_risk, current_pr, lines)
+ format_medium_risk_section(medium_risk, current_pr, changes_current, all_changes, lines)
+ format_low_risk_section(low_risk, current_pr, lines)
+
+ # Summary
+ total = len(overlaps)
+ lines.append(f"\n**Summary:** {len(conflicts)} conflicts, {len(high_risk)} high risk, {len(medium_risk)} medium risk, {len(low_risk)} low risk (out of {total} PRs with file overlap)")
+ lines.append("\n---\n*Auto-generated on push. Ignores: `openapi.json`, lock files.*")
+
+ return "\n".join(lines)
+
+
+def format_base_conflicts(overlaps: list["Overlap"], lines: list[str]):
+ """Format base branch conflicts section."""
+ base_conflicts = [o for o in overlaps if o.conflict_type == 'pr_a_conflicts_base']
+ if base_conflicts:
+ lines.append("### ⚠️ This PR has conflicts with the base branch\n")
+ lines.append("Conflicts will need to be resolved before merging:\n")
+ first = base_conflicts[0]
+ for f in first.conflict_files[:10]:
+ lines.append(f"- `{f}`")
+ if len(first.conflict_files) > 10:
+ lines.append(f"- ... and {len(first.conflict_files) - 10} more files")
+ lines.append("\n")
+
+
+def format_conflicts_section(conflicts: list[tuple], current_pr: int, lines: list[str]):
+ """Format the merge conflicts section."""
+ pr_conflicts = [(o, r) for o, r in conflicts if o.conflict_type != 'pr_a_conflicts_base']
+
+ if not pr_conflicts:
+ return
+
+ lines.append("### 🔴 Merge Conflicts Detected")
+ lines.append("")
+ lines.append("The following PRs have been tested and **will have merge conflicts** if merged after this PR. Consider coordinating with the authors.")
+ lines.append("")
+
+ for o, _ in pr_conflicts:
+ other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
+ format_pr_entry(other, lines)
+ format_conflict_details(o, lines)
+ lines.append("")
+
+
+def format_high_risk_section(high_risk: list[tuple], current_pr: int, lines: list[str]):
+ """Format the high risk section."""
+ if not high_risk:
+ return
+
+ lines.append("### 🟠 High Risk — Significant Line Overlap")
+ lines.append("")
+ lines.append("These PRs modify many of the same lines (>20 lines). While not yet tested for conflicts, they have high potential to conflict.")
+ lines.append("")
+
+ for o, _ in high_risk:
+ other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
+ format_pr_entry(other, lines)
+ format_line_overlaps(o.line_overlaps, lines)
+ lines.append("")
+
+
+def format_medium_risk_section(
+ medium_risk: list[tuple],
+ current_pr: int,
+ changes_current: dict,
+ all_changes: dict,
+ lines: list[str]
+):
+ """Format the medium risk section."""
+ if not medium_risk:
+ return
+
+ lines.append("### 🟡 Medium Risk — Some Line Overlap\n")
+ lines.append("These PRs have some overlapping changes:\n")
+
+ for o, _ in medium_risk:
+ other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
+ other_changes = all_changes.get(other.number, {})
+ format_pr_entry(other, lines)
+
+ # Note if rename is involved
+ for file_path in o.overlapping_files:
+ file_a = changes_current.get(file_path)
+ file_b = other_changes.get(file_path)
+ if (file_a and file_a.is_rename) or (file_b and file_b.is_rename):
+ lines.append(f" - ⚠️ `{file_path}` is being renamed/moved")
+ break
+
+ if o.line_overlaps:
+ for file_path, ranges in o.line_overlaps.items():
+ range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
+ lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
+ else:
+ non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
+ if non_ignored:
+ lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
+ lines.append("")
+
+
+def format_low_risk_section(low_risk: list[tuple], current_pr: int, lines: list[str]):
+ """Format the low risk section."""
+ if not low_risk:
+ return
+
+ lines.append("### 🟢 Low Risk — File Overlap Only\n")
+ lines.append("These PRs touch the same files but different sections (click to expand)
\n")
+
+ for o, _ in low_risk:
+ other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
+ non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
+ if non_ignored:
+ format_pr_entry(other, lines)
+ if o.line_overlaps:
+ for file_path, ranges in o.line_overlaps.items():
+ range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
+ lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
+ else:
+ lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
+
+ lines.append("\n \n")
+
+
+def format_pr_entry(pr: "PullRequest", lines: list[str]):
+ """Format a single PR entry line."""
+ updated = format_relative_time(pr.updated_at)
+ updated_str = f" · updated {updated}" if updated else ""
+ lines.append(f"- **#{pr.number}** ({pr.author}{updated_str}): [{pr.title}]({pr.url})")
+
+
+def format_conflict_details(overlap: "Overlap", lines: list[str]):
+ """Format conflict details for a PR."""
+ if overlap.conflict_details:
+ all_paths = [d.path for d in overlap.conflict_details]
+ common_prefix = find_common_prefix(all_paths)
+ if common_prefix:
+ lines.append(f" - 📁 `{common_prefix}`")
+ for detail in overlap.conflict_details:
+ display_path = detail.path[len(common_prefix):] if common_prefix else detail.path
+ size_str = format_conflict_size(detail)
+ lines.append(f" - `{display_path}`{size_str}")
+ elif overlap.conflict_files:
+ common_prefix = find_common_prefix(overlap.conflict_files)
+ if common_prefix:
+ lines.append(f" - 📁 `{common_prefix}`")
+ for f in overlap.conflict_files:
+ display_path = f[len(common_prefix):] if common_prefix else f
+ lines.append(f" - `{display_path}`")
+
+
+def format_conflict_size(detail: "ConflictInfo") -> str:
+ """Format conflict size string for a file."""
+ if detail.conflict_count > 0:
+ return f" ({detail.conflict_count} conflict{'s' if detail.conflict_count > 1 else ''}, ~{detail.conflict_lines} lines)"
+ elif detail.conflict_type != 'content':
+ type_labels = {
+ 'both_added': 'added in both',
+ 'both_deleted': 'deleted in both',
+ 'deleted_by_us': 'deleted here, modified there',
+ 'deleted_by_them': 'modified here, deleted there',
+ 'added_by_us': 'added here',
+ 'added_by_them': 'added there',
+ }
+ label = type_labels.get(detail.conflict_type, detail.conflict_type)
+ return f" ({label})"
+ return ""
+
+
+def format_line_overlaps(line_overlaps: dict[str, list[tuple]], lines: list[str]):
+ """Format line overlap details."""
+ all_paths = list(line_overlaps.keys())
+ common_prefix = find_common_prefix(all_paths) if len(all_paths) > 1 else ""
+ if common_prefix:
+ lines.append(f" - 📁 `{common_prefix}`")
+ for file_path, ranges in line_overlaps.items():
+ display_path = file_path[len(common_prefix):] if common_prefix else file_path
+ range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
+ indent = " " if common_prefix else " "
+ lines.append(f"{indent}- `{display_path}`: {', '.join(range_strs)}")
+
+
+# =============================================================================
+# OVERLAP ANALYSIS
+# =============================================================================
+
+def classify_all_overlaps(
+ overlaps: list["Overlap"],
+ current_pr: int,
+ changes_current: dict,
+ all_changes: dict
+) -> list[tuple["Overlap", str]]:
+ """Classify all overlaps by risk level and sort them."""
+ classified = []
+ for o in overlaps:
+ other_pr = o.pr_b if o.pr_a.number == current_pr else o.pr_a
+ other_changes = all_changes.get(other_pr.number, {})
+ risk = classify_overlap_risk(o, changes_current, other_changes)
+ classified.append((o, risk))
+
+ risk_order = {'conflict': 0, 'high': 1, 'medium': 2, 'low': 3}
+ classified.sort(key=lambda x: risk_order.get(x[1], 99))
+
+ return classified
+
+
+def classify_overlap_risk(
+ overlap: "Overlap",
+ changes_a: dict[str, "ChangedFile"],
+ changes_b: dict[str, "ChangedFile"]
+) -> str:
+ """Classify the risk level of an overlap."""
+ if overlap.has_merge_conflict:
+ return 'conflict'
+
+ has_rename = any(
+ (changes_a.get(f) and changes_a[f].is_rename) or
+ (changes_b.get(f) and changes_b[f].is_rename)
+ for f in overlap.overlapping_files
+ )
+
+ if overlap.line_overlaps:
+ total_overlap_lines = sum(
+ end - start + 1
+ for ranges in overlap.line_overlaps.values()
+ for start, end in ranges
+ )
+
+ if total_overlap_lines > 20:
+ return 'high'
+ elif total_overlap_lines > 5:
+ return 'medium'
+ else:
+ return 'low'
+
+ if has_rename:
+ return 'medium'
+
+ return 'low'
+
+
+def find_line_overlaps(
+ changes_a: dict[str, "ChangedFile"],
+ changes_b: dict[str, "ChangedFile"],
+ shared_files: list[str]
+) -> dict[str, list[tuple[int, int]]]:
+ """Find overlapping line ranges in shared files."""
+ overlaps = {}
+
+ for file_path in shared_files:
+ if should_ignore_file(file_path):
+ continue
+
+ file_a = changes_a.get(file_path)
+ file_b = changes_b.get(file_path)
+
+ if not file_a or not file_b:
+ continue
+
+ # Skip pure renames
+ if file_a.is_rename and not file_a.additions and not file_a.deletions:
+ continue
+ if file_b.is_rename and not file_b.additions and not file_b.deletions:
+ continue
+
+ file_overlaps = find_range_overlaps(
+ file_a.additions + file_a.deletions,
+ file_b.additions + file_b.deletions
+ )
+
+ if file_overlaps:
+ overlaps[file_path] = merge_ranges(file_overlaps)
+
+ return overlaps
+
+
+def find_range_overlaps(
+ ranges_a: list[tuple[int, int]],
+ ranges_b: list[tuple[int, int]]
+) -> list[tuple[int, int]]:
+ """Find overlapping regions between two sets of ranges."""
+ overlaps = []
+ for range_a in ranges_a:
+ for range_b in ranges_b:
+ if ranges_overlap(range_a, range_b):
+ overlap_start = max(range_a[0], range_b[0])
+ overlap_end = min(range_a[1], range_b[1])
+ overlaps.append((overlap_start, overlap_end))
+ return overlaps
+
+
+def ranges_overlap(range_a: tuple[int, int], range_b: tuple[int, int]) -> bool:
+ """Check if two line ranges overlap."""
+ return range_a[0] <= range_b[1] and range_b[0] <= range_a[1]
+
+
+def merge_ranges(ranges: list[tuple[int, int]]) -> list[tuple[int, int]]:
+ """Merge overlapping line ranges."""
+ if not ranges:
+ return []
+
+ sorted_ranges = sorted(ranges, key=lambda x: x[0])
+ merged = [sorted_ranges[0]]
+
+ for current in sorted_ranges[1:]:
+ last = merged[-1]
+ if current[0] <= last[1] + 1:
+ merged[-1] = (last[0], max(last[1], current[1]))
+ else:
+ merged.append(current)
+
+ return merged
+
+
+# =============================================================================
+# MERGE CONFLICT TESTING
+# =============================================================================
+
+def test_merge_conflict(
+ owner: str,
+ repo: str,
+ base_branch: str,
+ pr_a: "PullRequest",
+ pr_b: "PullRequest"
+) -> tuple[bool, list[str], list["ConflictInfo"], str]:
+ """Test if merging both PRs would cause a conflict."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ # Clone repo
+ if not clone_repo(owner, repo, base_branch, tmpdir):
+ return False, [], [], None
+
+ configure_git(tmpdir)
+ fetch_pr_branches(tmpdir, pr_a.number, pr_b.number)
+
+ # Try merging PR A first
+ conflict_result = try_merge_pr(tmpdir, pr_a.number)
+ if conflict_result:
+ return True, conflict_result[0], conflict_result[1], 'pr_a_conflicts_base'
+
+ # Commit and try merging PR B
+ run_git(["commit", "-m", f"Merge PR #{pr_a.number}"], cwd=tmpdir, check=False)
+
+ conflict_result = try_merge_pr(tmpdir, pr_b.number)
+ if conflict_result:
+ return True, conflict_result[0], conflict_result[1], 'conflict'
+
+ return False, [], [], None
+
+
+def clone_repo(owner: str, repo: str, branch: str, tmpdir: str) -> bool:
+ """Clone the repository."""
+ clone_url = f"https://github.com/{owner}/{repo}.git"
+ result = run_git(
+ ["clone", "--depth=50", "--branch", branch, clone_url, tmpdir],
check=False
)
- if check and result.returncode != 0:
- print(f"Error running gh {' '.join(args)}: {result.stderr}", file=sys.stderr)
- if check:
- sys.exit(1)
- return result
+ if result.returncode != 0:
+ print(f"Failed to clone: {result.stderr}", file=sys.stderr)
+ return False
+ return True
-def run_git(args: list[str], cwd: str = None, check: bool = True) -> subprocess.CompletedProcess:
- """Run a git command."""
- result = subprocess.run(
- ["git"] + args,
- capture_output=True,
- text=True,
- cwd=cwd,
- check=False
- )
- if check and result.returncode != 0:
- print(f"Error running git {' '.join(args)}: {result.stderr}", file=sys.stderr)
- return result
+def configure_git(tmpdir: str):
+ """Configure git for commits."""
+ run_git(["config", "user.email", "github-actions[bot]@users.noreply.github.com"], cwd=tmpdir, check=False)
+ run_git(["config", "user.name", "github-actions[bot]"], cwd=tmpdir, check=False)
+def fetch_pr_branches(tmpdir: str, pr_a: int, pr_b: int):
+ """Fetch both PR branches."""
+ run_git(["fetch", "origin", f"pull/{pr_a}/head:pr-{pr_a}"], cwd=tmpdir, check=False)
+ run_git(["fetch", "origin", f"pull/{pr_b}/head:pr-{pr_b}"], cwd=tmpdir, check=False)
+
+
+def try_merge_pr(tmpdir: str, pr_number: int) -> Optional[tuple[list[str], list["ConflictInfo"]]]:
+ """Try to merge a PR. Returns conflict info if conflicts, None if success."""
+ result = run_git(["merge", "--no-commit", "--no-ff", f"pr-{pr_number}"], cwd=tmpdir, check=False)
+
+ if result.returncode == 0:
+ return None
+
+ # Conflict detected
+ conflict_files, conflict_details = extract_conflict_info(tmpdir, result.stderr)
+ run_git(["merge", "--abort"], cwd=tmpdir, check=False)
+
+ return conflict_files, conflict_details
+
+
+def extract_conflict_info(tmpdir: str, stderr: str) -> tuple[list[str], list["ConflictInfo"]]:
+ """Extract conflict information from git status."""
+ status_result = run_git(["status", "--porcelain"], cwd=tmpdir, check=False)
+
+ status_types = {
+ 'UU': 'content',
+ 'AA': 'both_added',
+ 'DD': 'both_deleted',
+ 'DU': 'deleted_by_us',
+ 'UD': 'deleted_by_them',
+ 'AU': 'added_by_us',
+ 'UA': 'added_by_them',
+ }
+
+ conflict_files = []
+ conflict_details = []
+
+ for line in status_result.stdout.split("\n"):
+ if len(line) >= 3 and line[0:2] in status_types:
+ status_code = line[0:2]
+ file_path = line[3:].strip()
+ conflict_files.append(file_path)
+
+ info = analyze_conflict_markers(file_path, tmpdir)
+ info.conflict_type = status_types.get(status_code, 'unknown')
+ conflict_details.append(info)
+
+ # Fallback to stderr parsing
+ if not conflict_files and stderr:
+ for line in stderr.split("\n"):
+ if "CONFLICT" in line and ":" in line:
+ parts = line.split(":")
+ if len(parts) > 1:
+ file_part = parts[-1].strip()
+ if file_part and not file_part.startswith("Merge"):
+ conflict_files.append(file_part)
+ conflict_details.append(ConflictInfo(path=file_part))
+
+ return conflict_files, conflict_details
+
+
+def analyze_conflict_markers(file_path: str, cwd: str) -> "ConflictInfo":
+ """Analyze a conflicted file to count conflict regions and lines."""
+ info = ConflictInfo(path=file_path)
+
+ try:
+ full_path = os.path.join(cwd, file_path)
+ with open(full_path, 'r', errors='ignore') as f:
+ content = f.read()
+
+ in_conflict = False
+ current_conflict_lines = 0
+
+ for line in content.split('\n'):
+ if line.startswith('<<<<<<<'):
+ in_conflict = True
+ info.conflict_count += 1
+ current_conflict_lines = 1
+ elif line.startswith('>>>>>>>'):
+ in_conflict = False
+ current_conflict_lines += 1
+ info.conflict_lines += current_conflict_lines
+ elif in_conflict:
+ current_conflict_lines += 1
+ except:
+ pass
+
+ return info
+
+
+# =============================================================================
+# DIFF PARSING
+# =============================================================================
+
+def parse_diff_ranges(diff: str) -> dict[str, "ChangedFile"]:
+ """Parse a unified diff and extract changed line ranges per file."""
+ files = {}
+ current_file = None
+ pending_rename_from = None
+ is_rename = False
+
+ for line in diff.split("\n"):
+ if line.startswith("rename from "):
+ pending_rename_from = line[12:]
+ is_rename = True
+ elif line.startswith("rename to "):
+ pass # Just tracking rename
+ elif line.startswith("similarity index"):
+ is_rename = True
+ elif line.startswith("+++ b/"):
+ path = line[6:]
+ current_file = ChangedFile(
+ path=path,
+ additions=[],
+ deletions=[],
+ is_rename=is_rename,
+ old_path=pending_rename_from
+ )
+ files[path] = current_file
+ pending_rename_from = None
+ is_rename = False
+ elif line.startswith("--- /dev/null"):
+ is_rename = False
+ pending_rename_from = None
+ elif line.startswith("@@") and current_file:
+ parse_hunk_header(line, current_file)
+
+ return files
+
+
+def parse_hunk_header(line: str, current_file: "ChangedFile"):
+ """Parse a diff hunk header and add ranges to the file."""
+ match = re.match(r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line)
+ if match:
+ old_start = int(match.group(1))
+ old_count = int(match.group(2) or 1)
+ new_start = int(match.group(3))
+ new_count = int(match.group(4) or 1)
+
+ if old_count > 0:
+ current_file.deletions.append((old_start, old_start + old_count - 1))
+ if new_count > 0:
+ current_file.additions.append((new_start, new_start + new_count - 1))
+
+
+# =============================================================================
+# GITHUB API
+# =============================================================================
+
def get_repo_info() -> tuple[str, str]:
- """Get owner and repo name from current directory or environment."""
- # Try environment first (for GitHub Actions)
+ """Get owner and repo name from environment or git."""
if os.environ.get("GITHUB_REPOSITORY"):
owner, repo = os.environ["GITHUB_REPOSITORY"].split("/")
return owner, repo
- # Fall back to gh repo view
result = run_gh(["repo", "view", "--json", "owner,name"])
data = json.loads(result.stdout)
return data["owner"]["login"], data["name"]
@@ -185,329 +857,83 @@ def query_open_prs(owner: str, repo: str, base_branch: str) -> list[dict]:
return prs
-def parse_diff_ranges(diff: str) -> dict[str, ChangedFile]:
- """Parse a unified diff and extract changed line ranges per file."""
- files = {}
- current_file = None
- pending_rename_from = None
- pending_rename_to = None
- is_rename = False
-
- lines = diff.split("\n")
- i = 0
- while i < len(lines):
- line = lines[i]
-
- # Detect rename: "rename from path" followed by "rename to path"
- if line.startswith("rename from "):
- pending_rename_from = line[12:]
- is_rename = True
- elif line.startswith("rename to "):
- pending_rename_to = line[10:]
-
- # Also detect similarity index (indicates rename with modifications)
- elif line.startswith("similarity index"):
- is_rename = True
-
- # Match file header: +++ b/path/to/file
- elif line.startswith("+++ b/"):
- path = line[6:]
- current_file = ChangedFile(
- path=path,
- additions=[],
- deletions=[],
- is_rename=is_rename,
- old_path=pending_rename_from
- )
- files[path] = current_file
- # Reset rename tracking for next file
- pending_rename_from = None
- pending_rename_to = None
- is_rename = False
-
- # Match new file (--- /dev/null means new file, not rename)
- elif line.startswith("--- /dev/null"):
- is_rename = False
- pending_rename_from = None
-
- # Match hunk header: @@ -start,count +start,count @@
- elif line.startswith("@@") and current_file:
- match = re.match(r"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line)
- if match:
- old_start = int(match.group(1))
- old_count = int(match.group(2) or 1)
- new_start = int(match.group(3))
- new_count = int(match.group(4) or 1)
-
- if old_count > 0:
- current_file.deletions.append((old_start, old_start + old_count - 1))
- if new_count > 0:
- current_file.additions.append((new_start, new_start + new_count - 1))
-
- i += 1
-
- return files
-
-
-def should_ignore_file(path: str) -> bool:
- """Check if a file should be ignored for overlap detection."""
- # Check exact match
- if path in IGNORE_FILES:
- return True
- # Check if basename matches (for lock files in any directory)
- basename = path.split("/")[-1]
- if basename in IGNORE_FILES:
- return True
- return False
-
-
def get_pr_diff(pr_number: int) -> str:
"""Get the diff for a PR."""
result = run_gh(["pr", "diff", str(pr_number)])
return result.stdout
-def ranges_overlap(range_a: tuple[int, int], range_b: tuple[int, int]) -> bool:
- """Check if two line ranges overlap."""
- return range_a[0] <= range_b[1] and range_b[0] <= range_a[1]
+def post_or_update_comment(pr_number: int, body: str):
+ """Post a new comment or update existing overlap detection comment."""
+ if body:
+ run_gh(["pr", "comment", str(pr_number), "--body", body])
-def find_line_overlaps(
- changes_a: dict[str, ChangedFile],
- changes_b: dict[str, ChangedFile],
- shared_files: list[str]
-) -> dict[str, list[tuple[int, int]]]:
- """Find overlapping line ranges in shared files."""
- overlaps = {}
+def send_discord_notification(webhook_url: str, pr: "PullRequest", overlaps: list["Overlap"]):
+ """Send a Discord notification about significant overlaps."""
+ conflicts = [o for o in overlaps if o.has_merge_conflict]
+ if not conflicts:
+ return
- for file_path in shared_files:
- # Skip ignored files
- if should_ignore_file(file_path):
- continue
-
- file_a = changes_a.get(file_path)
- file_b = changes_b.get(file_path)
-
- if not file_a or not file_b:
- continue
-
- # If either PR only renamed the file (no actual line changes), skip
- if file_a.is_rename and not file_a.additions and not file_a.deletions:
- continue
- if file_b.is_rename and not file_b.additions and not file_b.deletions:
- continue
-
- file_overlaps = []
-
- # Compare all range combinations (additions and deletions both matter)
- all_ranges_a = file_a.additions + file_a.deletions
- all_ranges_b = file_b.additions + file_b.deletions
-
- for range_a in all_ranges_a:
- for range_b in all_ranges_b:
- if ranges_overlap(range_a, range_b):
- # Record the overlapping region
- overlap_start = max(range_a[0], range_b[0])
- overlap_end = min(range_a[1], range_b[1])
- file_overlaps.append((overlap_start, overlap_end))
-
- # Deduplicate and merge overlapping ranges
- if file_overlaps:
- file_overlaps = merge_ranges(file_overlaps)
- overlaps[file_path] = file_overlaps
-
- return overlaps
-
-
-def merge_ranges(ranges: list[tuple[int, int]]) -> list[tuple[int, int]]:
- """Merge overlapping line ranges."""
- if not ranges:
- return []
-
- # Sort by start line
- sorted_ranges = sorted(ranges, key=lambda x: x[0])
- merged = [sorted_ranges[0]]
-
- for current in sorted_ranges[1:]:
- last = merged[-1]
- if current[0] <= last[1] + 1: # Overlapping or adjacent
- merged[-1] = (last[0], max(last[1], current[1]))
- else:
- merged.append(current)
-
- return merged
-
-
-def classify_overlap_risk(
- overlap: "Overlap",
- changes_a: dict[str, ChangedFile],
- changes_b: dict[str, ChangedFile]
-) -> str:
- """
- Classify the risk level of an overlap.
- Returns: 'conflict', 'high', 'medium', 'low'
- """
- if overlap.has_merge_conflict:
- return 'conflict'
-
- # Check if either PR involves a rename of shared files
- has_rename = False
- for file_path in overlap.overlapping_files:
- file_a = changes_a.get(file_path)
- file_b = changes_b.get(file_path)
- if (file_a and file_a.is_rename) or (file_b and file_b.is_rename):
- has_rename = True
- break
-
- if overlap.line_overlaps:
- # Count total overlapping lines
- total_overlap_lines = 0
- for ranges in overlap.line_overlaps.values():
- for start, end in ranges:
- total_overlap_lines += (end - start + 1)
-
- if total_overlap_lines > 20:
- return 'high'
- elif total_overlap_lines > 5:
- return 'medium'
- else:
- return 'low'
-
- # File overlap only (no line overlap)
- if has_rename:
- return 'medium' # Rename + edit can cause issues
-
- return 'low'
-
-
-def analyze_conflict_markers(file_path: str, cwd: str) -> ConflictInfo:
- """Analyze a conflicted file to count conflict regions and lines."""
- info = ConflictInfo(path=file_path)
-
- try:
- full_path = os.path.join(cwd, file_path)
- with open(full_path, 'r', errors='ignore') as f:
- content = f.read()
-
- lines = content.split('\n')
- in_conflict = False
- current_conflict_lines = 0
-
- for line in lines:
- if line.startswith('<<<<<<<'):
- in_conflict = True
- info.conflict_count += 1
- current_conflict_lines = 1
- elif line.startswith('>>>>>>>'):
- in_conflict = False
- current_conflict_lines += 1
- info.conflict_lines += current_conflict_lines
- elif in_conflict:
- current_conflict_lines += 1
- except:
- pass
-
- return info
-
-
-def test_merge_conflict(
- owner: str,
- repo: str,
- base_branch: str,
- pr_a: PullRequest,
- pr_b: PullRequest
-) -> tuple[bool, list[str], list[ConflictInfo], str]:
- """
- Test if merging both PRs would cause a conflict.
- Returns: (has_conflict, conflict_files, conflict_details, error_type)
- error_type can be: None, 'pr_a_conflicts_base', 'conflict'
- """
- with tempfile.TemporaryDirectory() as tmpdir:
- # Clone with more depth to handle merges properly
- clone_url = f"https://github.com/{owner}/{repo}.git"
- result = run_git(
- ["clone", "--depth=50", "--branch", base_branch, clone_url, tmpdir],
- check=False
- )
- if result.returncode != 0:
- print(f"Failed to clone: {result.stderr}", file=sys.stderr)
- return False, [], [], None
-
- # Configure git for commits
- run_git(["config", "user.email", "otto@agpt.co"], cwd=tmpdir, check=False)
- run_git(["config", "user.name", "Otto"], cwd=tmpdir, check=False)
-
- # Fetch both PR branches
- run_git(["fetch", "origin", f"pull/{pr_a.number}/head:pr-{pr_a.number}"], cwd=tmpdir, check=False)
- run_git(["fetch", "origin", f"pull/{pr_b.number}/head:pr-{pr_b.number}"], cwd=tmpdir, check=False)
-
- # Try merging PR A (the current PR) first
- result = run_git(["merge", "--no-commit", "--no-ff", f"pr-{pr_a.number}"], cwd=tmpdir, check=False)
- if result.returncode != 0:
- # PR A itself has conflicts with base - this is important to flag!
- status_result = run_git(["status", "--porcelain"], cwd=tmpdir, check=False)
- conflict_files = []
- conflict_details = []
- for line in status_result.stdout.split("\n"):
- if len(line) >= 3 and line[0:2] in ['UU', 'AA', 'DD', 'DU', 'UD', 'AU', 'UA']:
- file_path = line[3:].strip()
- conflict_files.append(file_path)
- # Analyze conflict markers
- info = analyze_conflict_markers(file_path, tmpdir)
- conflict_details.append(info)
- run_git(["merge", "--abort"], cwd=tmpdir, check=False)
- return True, conflict_files, conflict_details, 'pr_a_conflicts_base'
-
- # Commit the merge
- run_git(["commit", "-m", f"Merge PR #{pr_a.number}"], cwd=tmpdir, check=False)
-
- # Try merging PR B
- result = run_git(["merge", "--no-commit", "--no-ff", f"pr-{pr_b.number}"], cwd=tmpdir, check=False)
-
- if result.returncode != 0:
- # Conflict detected between A and B!
- status_result = run_git(["status", "--porcelain"], cwd=tmpdir, check=False)
- conflict_files = []
- conflict_details = []
-
- # Map git status codes to conflict types
- status_types = {
- 'UU': 'content', # Both modified
- 'AA': 'both_added', # Both added
- 'DD': 'both_deleted',
- 'DU': 'deleted_by_us',
- 'UD': 'deleted_by_them',
- 'AU': 'added_by_us',
- 'UA': 'added_by_them',
+ embed = {
+ "title": f"⚠️ PR #{pr.number} has merge conflicts",
+ "description": f"[{pr.title}]({pr.url})",
+ "color": 0xFF0000,
+ "fields": [
+ {
+ "name": f"Conflicts with #{(o.pr_b if o.pr_a.number == pr.number else o.pr_a).number}",
+ "value": f"[{(o.pr_b if o.pr_a.number == pr.number else o.pr_a).title}]({(o.pr_b if o.pr_a.number == pr.number else o.pr_a).url})\nFiles: `{'`, `'.join(o.conflict_files[:3])}`",
+ "inline": False
}
-
- for line in status_result.stdout.split("\n"):
- # Various conflict markers in git status
- if len(line) >= 3 and line[0:2] in status_types:
- status_code = line[0:2]
- file_path = line[3:].strip()
- conflict_files.append(file_path)
- # Analyze conflict markers
- info = analyze_conflict_markers(file_path, tmpdir)
- info.conflict_type = status_types.get(status_code, 'unknown')
- conflict_details.append(info)
-
- # If no files found via status, try to get them from the merge output
- if not conflict_files and result.stderr:
- for line in result.stderr.split("\n"):
- if "CONFLICT" in line and ":" in line:
- # Extract file path from conflict message
- parts = line.split(":")
- if len(parts) > 1:
- file_part = parts[-1].strip()
- if file_part and not file_part.startswith("Merge"):
- conflict_files.append(file_part)
- conflict_details.append(ConflictInfo(path=file_part))
-
- run_git(["merge", "--abort"], cwd=tmpdir, check=False)
- return True, conflict_files, conflict_details, 'conflict'
-
- return False, [], [], None
+ for o in conflicts
+ ]
+ }
+
+ subprocess.run(
+ ["curl", "-X", "POST", "-H", "Content-Type: application/json",
+ "-d", json.dumps({"embeds": [embed]}), webhook_url],
+ capture_output=True
+ )
+
+
+# =============================================================================
+# UTILITIES
+# =============================================================================
+
+def run_gh(args: list[str], check: bool = True) -> subprocess.CompletedProcess:
+ """Run a gh CLI command."""
+ result = subprocess.run(
+ ["gh"] + args,
+ capture_output=True,
+ text=True,
+ check=False
+ )
+ if check and result.returncode != 0:
+ print(f"Error running gh {' '.join(args)}: {result.stderr}", file=sys.stderr)
+ sys.exit(1)
+ return result
+
+
+def run_git(args: list[str], cwd: str = None, check: bool = True) -> subprocess.CompletedProcess:
+ """Run a git command."""
+ result = subprocess.run(
+ ["git"] + args,
+ capture_output=True,
+ text=True,
+ cwd=cwd,
+ check=False
+ )
+ if check and result.returncode != 0:
+ print(f"Error running git {' '.join(args)}: {result.stderr}", file=sys.stderr)
+ return result
+
+
+def should_ignore_file(path: str) -> bool:
+ """Check if a file should be ignored for overlap detection."""
+ if path in IGNORE_FILES:
+ return True
+ basename = path.split("/")[-1]
+ return basename in IGNORE_FILES
def find_common_prefix(paths: list[str]) -> str:
@@ -515,14 +941,10 @@ def find_common_prefix(paths: list[str]) -> str:
if not paths:
return ""
if len(paths) == 1:
- # For single file, use the directory
parts = paths[0].rsplit('/', 1)
return parts[0] + '/' if len(parts) > 1 else ""
- # Split all paths into parts
split_paths = [p.split('/') for p in paths]
-
- # Find common prefix parts
common = []
for parts in zip(*split_paths):
if len(set(parts)) == 1:
@@ -534,13 +956,12 @@ def find_common_prefix(paths: list[str]) -> str:
def format_relative_time(iso_timestamp: str) -> str:
- """Format an ISO timestamp as relative time (e.g., '2 hours ago')."""
+ """Format an ISO timestamp as relative time."""
if not iso_timestamp:
return ""
from datetime import datetime, timezone
try:
- # Parse ISO timestamp
dt = datetime.fromisoformat(iso_timestamp.replace('Z', '+00:00'))
now = datetime.now(timezone.utc)
diff = now - dt
@@ -549,383 +970,87 @@ def format_relative_time(iso_timestamp: str) -> str:
if seconds < 60:
return "just now"
elif seconds < 3600:
- mins = int(seconds / 60)
- return f"{mins}m ago"
+ return f"{int(seconds / 60)}m ago"
elif seconds < 86400:
- hours = int(seconds / 3600)
- return f"{hours}h ago"
+ return f"{int(seconds / 3600)}h ago"
else:
- days = int(seconds / 86400)
- return f"{days}d ago"
+ return f"{int(seconds / 86400)}d ago"
except:
return ""
-def format_comment(overlaps: list[Overlap], current_pr: int, changes_current: dict[str, ChangedFile], all_changes: dict[int, dict[str, ChangedFile]]) -> str:
- """Format the overlap report as a PR comment."""
- if not overlaps:
- return ""
-
- lines = ["## 🔍 PR Overlap Detection"]
- lines.append("")
- lines.append("This check compares your PR against all other open PRs targeting the same branch to detect potential merge conflicts early.")
- lines.append("")
-
- # Check if current PR conflicts with base branch
- base_conflicts = [o for o in overlaps if o.conflict_type == 'pr_a_conflicts_base']
- if base_conflicts:
- lines.append("### ⚠️ This PR has conflicts with the base branch\n")
- lines.append("Conflicts will need to be resolved before merging:\n")
- # Just show the first one since they'll all report the same base conflict
- first = base_conflicts[0]
- for f in first.conflict_files[:10]:
- lines.append(f"- `{f}`")
- if len(first.conflict_files) > 10:
- lines.append(f"- ... and {len(first.conflict_files) - 10} more files")
- lines.append("\n")
-
- # Classify each overlap
- classified = []
- for o in overlaps:
- other_pr = o.pr_b if o.pr_a.number == current_pr else o.pr_a
- other_changes = all_changes.get(other_pr.number, {})
- risk = classify_overlap_risk(o, changes_current, other_changes)
- classified.append((o, risk))
-
- # Sort by risk level
- risk_order = {'conflict': 0, 'high': 1, 'medium': 2, 'low': 3}
- classified.sort(key=lambda x: risk_order.get(x[1], 99))
-
- # Group by risk
- conflicts = [(o, r) for o, r in classified if r == 'conflict']
- high_risk = [(o, r) for o, r in classified if r == 'high']
- medium_risk = [(o, r) for o, r in classified if r == 'medium']
- low_risk = [(o, r) for o, r in classified if r == 'low']
-
- # Filter out base conflicts from the PR-to-PR conflicts
- pr_conflicts = [(o, r) for o, r in conflicts if o.conflict_type != 'pr_a_conflicts_base']
-
- if pr_conflicts:
- lines.append("### 🔴 Merge Conflicts Detected")
- lines.append("")
- lines.append("The following PRs have been tested and **will have merge conflicts** if merged after this PR. Consider coordinating with the authors.")
- lines.append("")
- for o, _ in pr_conflicts:
- other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
- updated = format_relative_time(other.updated_at)
- updated_str = f" · updated {updated}" if updated else ""
- lines.append(f"- **#{other.number}** ({other.author}{updated_str}): [{other.title}]({other.url})")
-
- # Show conflict details with sizes - no truncation
- if o.conflict_details:
- all_paths = [d.path for d in o.conflict_details]
- common_prefix = find_common_prefix(all_paths)
- if common_prefix:
- lines.append(f" - 📁 `{common_prefix}`")
- for detail in o.conflict_details:
- # Remove common prefix for display
- display_path = detail.path[len(common_prefix):] if common_prefix else detail.path
- size_str = ""
- if detail.conflict_count > 0:
- size_str = f" ({detail.conflict_count} conflict{'s' if detail.conflict_count > 1 else ''}, ~{detail.conflict_lines} lines)"
- elif detail.conflict_type != 'content':
- # Show the conflict type if no content markers found
- type_labels = {
- 'both_added': 'added in both',
- 'both_deleted': 'deleted in both',
- 'deleted_by_us': 'deleted here, modified there',
- 'deleted_by_them': 'modified here, deleted there',
- 'added_by_us': 'added here',
- 'added_by_them': 'added there',
- }
- label = type_labels.get(detail.conflict_type, detail.conflict_type)
- size_str = f" ({label})"
- lines.append(f" - `{display_path}`{size_str}")
- elif o.conflict_files:
- # Fallback to just file names - no truncation
- common_prefix = find_common_prefix(o.conflict_files)
- if common_prefix:
- lines.append(f" - 📁 `{common_prefix}`")
- for f in o.conflict_files:
- display_path = f[len(common_prefix):] if common_prefix else f
- lines.append(f" - `{display_path}`")
- lines.append("")
-
- if high_risk:
- lines.append("### 🟠 High Risk — Significant Line Overlap")
- lines.append("")
- lines.append("These PRs modify many of the same lines (>20 lines). While not yet tested for conflicts, they have high potential to conflict.")
- lines.append("")
- for o, _ in high_risk:
- other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
- updated = format_relative_time(other.updated_at)
- updated_str = f" · updated {updated}" if updated else ""
- lines.append(f"- **#{other.number}** ({other.author}{updated_str}): [{other.title}]({other.url})")
- all_paths = list(o.line_overlaps.keys())
- common_prefix = find_common_prefix(all_paths) if len(all_paths) > 1 else ""
- if common_prefix:
- lines.append(f" - 📁 `{common_prefix}`")
- for file_path, ranges in o.line_overlaps.items():
- display_path = file_path[len(common_prefix):] if common_prefix else file_path
- range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
- indent = " " if common_prefix else " "
- lines.append(f"{indent}- `{display_path}`: {', '.join(range_strs)}")
- lines.append("")
-
- if medium_risk:
- lines.append("### 🟡 Medium Risk — Some Line Overlap\n")
- lines.append("These PRs have some overlapping changes:\n")
- for o, _ in medium_risk:
- other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
- other_changes = all_changes.get(other.number, {})
- updated = format_relative_time(other.updated_at)
- updated_str = f" · updated {updated}" if updated else ""
- lines.append(f"- **#{other.number}** ({other.author}{updated_str}): [{other.title}]({other.url})")
-
- # Note if rename is involved
- for file_path in o.overlapping_files:
- file_a = changes_current.get(file_path)
- file_b = other_changes.get(file_path)
- if (file_a and file_a.is_rename) or (file_b and file_b.is_rename):
- lines.append(f" - ⚠️ `{file_path}` is being renamed/moved")
- break
-
- if o.line_overlaps:
- for file_path, ranges in o.line_overlaps.items():
- range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
- lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
- else:
- non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
- if non_ignored:
- lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
- lines.append("")
-
- if low_risk:
- lines.append("### 🟢 Low Risk — File Overlap Only\n")
- lines.append("These PRs touch the same files but different sections (click to expand)
\n")
- for o, _ in low_risk:
- other = o.pr_b if o.pr_a.number == current_pr else o.pr_a
- non_ignored = [f for f in o.overlapping_files if not should_ignore_file(f)]
- if non_ignored: # Only show if there are non-ignored files
- updated = format_relative_time(other.updated_at)
- updated_str = f" · updated {updated}" if updated else ""
- lines.append(f"- **#{other.number}** ({other.author}{updated_str}): [{other.title}]({other.url})")
- if o.line_overlaps:
- for file_path, ranges in o.line_overlaps.items():
- range_strs = [f"L{r[0]}-{r[1]}" if r[0] != r[1] else f"L{r[0]}" for r in ranges]
- lines.append(f" - `{file_path}`: {', '.join(range_strs)}")
- else:
- lines.append(f" - Shared files: `{'`, `'.join(non_ignored[:5])}`")
- lines.append("\n \n")
-
- # Summary
- total = len(overlaps)
- lines.append(f"\n**Summary:** {len(conflicts)} conflicts, {len(high_risk)} high risk, {len(medium_risk)} medium risk, {len(low_risk)} low risk (out of {total} PRs with file overlap)")
- lines.append("\n---\n*Auto-generated on push. Ignores: `openapi.json`, lock files.*")
-
- return "\n".join(lines)
+# =============================================================================
+# DATA CLASSES
+# =============================================================================
+
+@dataclass
+class ChangedFile:
+ """Represents a file changed in a PR."""
+ path: str
+ additions: list[tuple[int, int]]
+ deletions: list[tuple[int, int]]
+ is_rename: bool = False
+ old_path: str = None
-def post_or_update_comment(pr_number: int, body: str):
- """Post a new comment or update existing overlap detection comment."""
- # Check for existing comment
- result = run_gh(["pr", "view", str(pr_number), "--json", "comments"])
- data = json.loads(result.stdout)
-
- marker = "## 🔍 PR Overlap Detection"
- existing_comment_id = None
-
- for comment in data.get("comments", []):
- if marker in comment.get("body", ""):
- # Extract comment ID from the comment data
- # gh pr view doesn't give us the ID directly, so we need to use the API
- break
-
- # For now, just post a new comment (we can improve this later to update existing)
- if body:
- run_gh(["pr", "comment", str(pr_number), "--body", body])
+@dataclass
+class PullRequest:
+ """Represents a pull request."""
+ number: int
+ title: str
+ author: str
+ url: str
+ head_ref: str
+ base_ref: str
+ files: list[str]
+ changed_ranges: dict[str, ChangedFile]
+ updated_at: str = None
-def send_discord_notification(webhook_url: str, pr: PullRequest, overlaps: list[Overlap]):
- """Send a Discord notification about significant overlaps."""
- if not webhook_url or not overlaps:
- return
-
- conflicts = [o for o in overlaps if o.has_merge_conflict]
- if not conflicts:
- return # Only notify for actual conflicts
-
- # Build Discord embed
- embed = {
- "title": f"⚠️ PR #{pr.number} has merge conflicts",
- "description": f"[{pr.title}]({pr.url})",
- "color": 0xFF0000, # Red
- "fields": []
- }
-
- for o in conflicts:
- other = o.pr_b if o.pr_a.number == pr.number else o.pr_a
- embed["fields"].append({
- "name": f"Conflicts with #{other.number}",
- "value": f"[{other.title}]({other.url})\nFiles: `{'`, `'.join(o.conflict_files[:3])}`",
- "inline": False
- })
-
- payload = {"embeds": [embed]}
-
- # Use curl to send (avoiding extra dependencies)
- subprocess.run(
- ["curl", "-X", "POST", "-H", "Content-Type: application/json",
- "-d", json.dumps(payload), webhook_url],
- capture_output=True
- )
+@dataclass
+class ConflictInfo:
+ """Info about a single conflicting file."""
+ path: str
+ conflict_count: int = 0
+ conflict_lines: int = 0
+ conflict_type: str = "content"
-def main():
- import argparse
-
- parser = argparse.ArgumentParser(description="Detect PR overlaps and potential merge conflicts")
- parser.add_argument("pr_number", type=int, help="PR number to check")
- parser.add_argument("--base", default=None, help="Base branch (default: auto-detect from PR)")
- parser.add_argument("--skip-merge-test", action="store_true", help="Skip actual merge conflict testing")
- parser.add_argument("--discord-webhook", default=os.environ.get("DISCORD_WEBHOOK_URL"), help="Discord webhook URL for notifications")
- parser.add_argument("--dry-run", action="store_true", help="Don't post comments, just print")
-
- args = parser.parse_args()
-
- owner, repo = get_repo_info()
- print(f"Checking PR #{args.pr_number} in {owner}/{repo}")
-
- # Get current PR info
- result = run_gh(["pr", "view", str(args.pr_number), "--json", "number,title,url,author,headRefName,baseRefName,files"])
- current_pr_data = json.loads(result.stdout)
-
- base_branch = args.base or current_pr_data["baseRefName"]
-
- current_pr = PullRequest(
- number=current_pr_data["number"],
- title=current_pr_data["title"],
- author=current_pr_data["author"]["login"],
- url=current_pr_data["url"],
- head_ref=current_pr_data["headRefName"],
- base_ref=base_branch,
- files=[f["path"] for f in current_pr_data["files"]],
- changed_ranges={}
- )
-
- print(f"PR #{current_pr.number}: {current_pr.title}")
- print(f"Base branch: {base_branch}")
- print(f"Files changed: {len(current_pr.files)}")
-
- # Query other open PRs
- all_prs = query_open_prs(owner, repo, base_branch)
- other_prs = [p for p in all_prs if p["number"] != args.pr_number]
-
- print(f"Found {len(other_prs)} other open PRs targeting {base_branch}")
-
- # Find file overlaps (excluding ignored files)
- current_files = set(f for f in current_pr.files if not should_ignore_file(f))
- candidates = []
-
- for pr_data in other_prs:
- other_files = set(f for f in pr_data["files"] if not should_ignore_file(f))
- shared = current_files & other_files
-
- if shared:
- candidates.append((pr_data, list(shared)))
-
- print(f"Found {len(candidates)} PRs with file overlap (excluding ignored files)")
-
- if not candidates:
- print("No overlaps detected!")
- return
-
- # Get detailed diff for current PR
- current_diff = get_pr_diff(args.pr_number)
- current_pr.changed_ranges = parse_diff_ranges(current_diff)
-
- overlaps = []
- all_changes = {} # Store all PR changes for risk classification
-
- for pr_data, shared_files in candidates:
- # Filter out ignored files
- non_ignored_shared = [f for f in shared_files if not should_ignore_file(f)]
- if not non_ignored_shared:
- continue # Skip if all shared files are ignored
-
- other_pr = PullRequest(
- number=pr_data["number"],
- title=pr_data["title"],
- author=pr_data["author"],
- url=pr_data["url"],
- head_ref=pr_data["head_ref"],
- base_ref=pr_data["base_ref"],
- files=pr_data["files"],
- changed_ranges={},
- updated_at=pr_data.get("updated_at")
- )
-
- # Get diff for other PR
- other_diff = get_pr_diff(other_pr.number)
- other_pr.changed_ranges = parse_diff_ranges(other_diff)
- all_changes[other_pr.number] = other_pr.changed_ranges
-
- # Check line overlaps (now filters ignored files internally)
- line_overlaps = find_line_overlaps(
- current_pr.changed_ranges,
- other_pr.changed_ranges,
- shared_files
- )
-
- overlap = Overlap(
- pr_a=current_pr,
- pr_b=other_pr,
- overlapping_files=non_ignored_shared,
- line_overlaps=line_overlaps
- )
-
- # Test for actual merge conflicts if we have line overlaps
- if line_overlaps and not args.skip_merge_test:
- print(f"Testing merge conflict with PR #{other_pr.number}...", flush=True)
- has_conflict, conflict_files, conflict_details, error_type = test_merge_conflict(
- owner, repo, base_branch, current_pr, other_pr
- )
- overlap.has_merge_conflict = has_conflict
- overlap.conflict_files = conflict_files
- overlap.conflict_details = conflict_details
- overlap.conflict_type = error_type
-
- overlaps.append(overlap)
-
- # Generate report
- comment = format_comment(overlaps, args.pr_number, current_pr.changed_ranges, all_changes)
-
- if args.dry_run:
- print("\n" + "="*60)
- print("COMMENT PREVIEW:")
- print("="*60)
- print(comment)
- else:
- if comment:
- post_or_update_comment(args.pr_number, comment)
- print("Posted comment to PR")
-
- # Discord notification for conflicts
- if args.discord_webhook:
- send_discord_notification(args.discord_webhook, current_pr, overlaps)
-
- # Exit with non-zero if conflicts found (for CI)
- conflicts = [o for o in overlaps if o.has_merge_conflict]
- if conflicts:
- print(f"\n⚠️ Found {len(conflicts)} merge conflict(s)")
- sys.exit(1)
-
- line_overlap_count = len([o for o in overlaps if o.line_overlaps])
- if line_overlap_count:
- print(f"\n⚠️ Found {line_overlap_count} PR(s) with line overlap")
-
- print("\n✅ Done")
+@dataclass
+class Overlap:
+ """Represents an overlap between two PRs."""
+ pr_a: PullRequest
+ pr_b: PullRequest
+ overlapping_files: list[str]
+ line_overlaps: dict[str, list[tuple[int, int]]]
+ has_merge_conflict: bool = False
+ conflict_files: list[str] = None
+ conflict_details: list[ConflictInfo] = None
+ conflict_type: str = None
+ def __post_init__(self):
+ if self.conflict_files is None:
+ self.conflict_files = []
+ if self.conflict_details is None:
+ self.conflict_details = []
+
+
+# =============================================================================
+# CONSTANTS
+# =============================================================================
+
+IGNORE_FILES = {
+ "autogpt_platform/frontend/src/app/api/openapi.json",
+ "poetry.lock",
+ "pnpm-lock.yaml",
+ "package-lock.json",
+ "yarn.lock",
+}
+
+
+# =============================================================================
+# ENTRY POINT
+# =============================================================================
if __name__ == "__main__":
main()