mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
560 lines
20 KiB
Python
560 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Lists the longest and shortest code files in the project, and counts duplicated function names across files. Useful for identifying potential refactoring targets and enforcing code size guidelines.
|
|
Threshold can be set to warn about files longer or shorter than a certain number of lines.
|
|
|
|
CI mode (--compare-to): Only warns about files that grew past threshold compared to a base ref.
|
|
Use --strict to exit non-zero on violations for CI gating.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Dict, Set, Optional
|
|
from collections import defaultdict
|
|
|
|
# File extensions to consider as code files
|
|
CODE_EXTENSIONS = {
|
|
'.ts', '.tsx', '.js', '.jsx', '.mjs', '.cjs', # TypeScript/JavaScript
|
|
'.swift', # macOS/iOS
|
|
'.kt', '.java', # Android
|
|
'.py', '.sh', # Scripts
|
|
}
|
|
|
|
# Directories to skip
|
|
SKIP_DIRS = {
|
|
'node_modules', '.git', 'dist', 'build', 'coverage',
|
|
'__pycache__', '.turbo', 'out', '.worktrees', 'vendor',
|
|
'Pods', 'DerivedData', '.gradle', '.idea',
|
|
'Swabble', # Separate Swift package
|
|
'skills', # Standalone skill scripts
|
|
'.pi', # Pi editor extensions
|
|
}
|
|
|
|
# Filename patterns to skip in short-file warnings (barrel exports, stubs)
|
|
SKIP_SHORT_PATTERNS = {
|
|
'index.js', 'index.ts', 'postinstall.js',
|
|
}
|
|
SKIP_SHORT_SUFFIXES = ('-cli.ts',)
|
|
|
|
# Function names to skip in duplicate detection (common utilities, test helpers)
|
|
SKIP_DUPLICATE_FUNCTIONS = {
|
|
# Common utility names
|
|
'main', 'init', 'setup', 'teardown', 'cleanup', 'dispose', 'destroy',
|
|
'open', 'close', 'connect', 'disconnect', 'execute', 'run', 'start', 'stop',
|
|
'render', 'update', 'refresh', 'reset', 'clear', 'flush',
|
|
}
|
|
|
|
SKIP_DUPLICATE_PREFIXES = (
|
|
# Transformers
|
|
'normalize', 'parse', 'validate', 'serialize', 'deserialize',
|
|
'convert', 'transform', 'extract', 'encode', 'decode',
|
|
# Predicates
|
|
'is', 'has', 'can', 'should', 'will',
|
|
# Constructors/factories
|
|
'create', 'make', 'build', 'generate', 'new',
|
|
# Accessors
|
|
'get', 'set', 'read', 'write', 'load', 'save', 'fetch',
|
|
# Handlers
|
|
'handle', 'on', 'emit',
|
|
# Modifiers
|
|
'add', 'remove', 'delete', 'update', 'insert', 'append',
|
|
# Other common
|
|
'to', 'from', 'with', 'apply', 'process', 'resolve', 'ensure', 'check',
|
|
'filter', 'map', 'reduce', 'merge', 'split', 'join', 'find', 'search',
|
|
'register', 'unregister', 'subscribe', 'unsubscribe',
|
|
)
|
|
SKIP_DUPLICATE_FILE_PATTERNS = ('.test.ts', '.test.tsx', '.spec.ts')
|
|
|
|
# Known packages in the monorepo
|
|
PACKAGES = {
|
|
'src', 'apps', 'extensions', 'packages', 'scripts', 'ui', 'test', 'docs'
|
|
}
|
|
|
|
|
|
def get_package(file_path: Path, root_dir: Path) -> str:
|
|
"""Get the package name for a file, or 'root' if at top level."""
|
|
try:
|
|
relative = file_path.relative_to(root_dir)
|
|
parts = relative.parts
|
|
if len(parts) > 0 and parts[0] in PACKAGES:
|
|
return parts[0]
|
|
return 'root'
|
|
except ValueError:
|
|
return 'root'
|
|
|
|
|
|
def count_lines(file_path: Path) -> int:
|
|
"""Count the number of lines in a file."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
return sum(1 for _ in f)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def find_code_files(root_dir: Path) -> List[Tuple[Path, int]]:
|
|
"""Find all code files and their line counts."""
|
|
files_with_counts = []
|
|
|
|
for dirpath, dirnames, filenames in os.walk(root_dir):
|
|
# Remove skip directories from dirnames to prevent walking into them
|
|
dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]
|
|
|
|
for filename in filenames:
|
|
file_path = Path(dirpath) / filename
|
|
if file_path.suffix.lower() in CODE_EXTENSIONS:
|
|
line_count = count_lines(file_path)
|
|
files_with_counts.append((file_path, line_count))
|
|
|
|
return files_with_counts
|
|
|
|
|
|
# Regex patterns for TypeScript functions (exported and internal)
|
|
TS_FUNCTION_PATTERNS = [
|
|
# export function name(...) or function name(...)
|
|
re.compile(r'^(?:export\s+)?(?:async\s+)?function\s+(\w+)', re.MULTILINE),
|
|
# export const name = or const name =
|
|
re.compile(r'^(?:export\s+)?const\s+(\w+)\s*=\s*(?:\([^)]*\)|\w+)\s*=>', re.MULTILINE),
|
|
]
|
|
|
|
|
|
def extract_functions(file_path: Path) -> Set[str]:
|
|
"""Extract function names from a TypeScript file."""
|
|
if file_path.suffix.lower() not in {'.ts', '.tsx'}:
|
|
return set()
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
content = f.read()
|
|
except Exception:
|
|
return set()
|
|
|
|
return extract_functions_from_content(content)
|
|
|
|
|
|
def find_duplicate_functions(files: List[Tuple[Path, int]], root_dir: Path) -> Dict[str, List[Path]]:
|
|
"""Find function names that appear in multiple files."""
|
|
function_locations: Dict[str, List[Path]] = defaultdict(list)
|
|
|
|
for file_path, _ in files:
|
|
# Skip test files for duplicate detection
|
|
if any(file_path.name.endswith(pat) for pat in SKIP_DUPLICATE_FILE_PATTERNS):
|
|
continue
|
|
|
|
functions = extract_functions(file_path)
|
|
for func in functions:
|
|
# Skip known common function names
|
|
if func in SKIP_DUPLICATE_FUNCTIONS:
|
|
continue
|
|
if any(func.startswith(prefix) for prefix in SKIP_DUPLICATE_PREFIXES):
|
|
continue
|
|
function_locations[func].append(file_path)
|
|
|
|
# Filter to only duplicates
|
|
return {name: paths for name, paths in function_locations.items() if len(paths) > 1}
|
|
|
|
|
|
def validate_git_ref(root_dir: Path, ref: str) -> bool:
|
|
"""Validate that a git ref exists. Exits with error if not."""
|
|
try:
|
|
result = subprocess.run(
|
|
['git', 'rev-parse', '--verify', ref],
|
|
capture_output=True,
|
|
cwd=root_dir,
|
|
encoding='utf-8',
|
|
)
|
|
return result.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_file_content_at_ref(file_path: Path, root_dir: Path, ref: str) -> Optional[str]:
|
|
"""Get content of a file at a specific git ref. Returns None if file doesn't exist at ref."""
|
|
try:
|
|
relative_path = file_path.relative_to(root_dir)
|
|
# Use forward slashes for git paths
|
|
git_path = str(relative_path).replace('\\', '/')
|
|
result = subprocess.run(
|
|
['git', 'show', f'{ref}:{git_path}'],
|
|
capture_output=True,
|
|
cwd=root_dir,
|
|
encoding='utf-8',
|
|
errors='ignore',
|
|
)
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.strip()
|
|
# "does not exist" or "exists on disk, but not in" = file missing at ref (OK)
|
|
if 'does not exist' in stderr or 'exists on disk' in stderr:
|
|
return None
|
|
# Other errors (bad ref, git broken) = genuine failure
|
|
if stderr:
|
|
print(f"⚠️ git show error for {git_path}: {stderr}", file=sys.stderr)
|
|
return None
|
|
return result.stdout
|
|
except Exception as e:
|
|
print(f"⚠️ failed to read {file_path} at {ref}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def get_line_count_at_ref(file_path: Path, root_dir: Path, ref: str) -> Optional[int]:
|
|
"""Get line count of a file at a specific git ref. Returns None if file doesn't exist at ref."""
|
|
content = get_file_content_at_ref(file_path, root_dir, ref)
|
|
if content is None:
|
|
return None
|
|
return len(content.splitlines())
|
|
|
|
|
|
def extract_functions_from_content(content: str) -> Set[str]:
|
|
"""Extract function names from TypeScript content string."""
|
|
functions = set()
|
|
for pattern in TS_FUNCTION_PATTERNS:
|
|
for match in pattern.finditer(content):
|
|
functions.add(match.group(1))
|
|
return functions
|
|
|
|
|
|
def get_changed_files(root_dir: Path, compare_ref: str) -> Set[str]:
|
|
"""Get set of files changed between compare_ref and HEAD (relative paths with forward slashes)."""
|
|
try:
|
|
result = subprocess.run(
|
|
['git', 'diff', '--name-only', compare_ref, 'HEAD'],
|
|
capture_output=True,
|
|
cwd=root_dir,
|
|
encoding='utf-8',
|
|
errors='ignore',
|
|
)
|
|
if result.returncode != 0:
|
|
return set()
|
|
return {line.strip() for line in result.stdout.splitlines() if line.strip()}
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def find_duplicate_regressions(
|
|
files: List[Tuple[Path, int]],
|
|
root_dir: Path,
|
|
compare_ref: str,
|
|
) -> Dict[str, List[Path]]:
|
|
"""
|
|
Find new duplicate function names that didn't exist at the base ref.
|
|
Only checks functions in files that changed to keep CI fast.
|
|
Returns dict of function_name -> list of current file paths, only for
|
|
duplicates that are new (weren't duplicated at compare_ref).
|
|
"""
|
|
# Build current duplicate map
|
|
current_dupes = find_duplicate_functions(files, root_dir)
|
|
if not current_dupes:
|
|
return {}
|
|
|
|
# Get changed files to scope the comparison
|
|
changed_files = get_changed_files(root_dir, compare_ref)
|
|
if not changed_files:
|
|
return {} # Nothing changed, no new duplicates possible
|
|
|
|
# Only check duplicate functions that involve at least one changed file
|
|
relevant_dupes: Dict[str, List[Path]] = {}
|
|
for func_name, paths in current_dupes.items():
|
|
involves_changed = any(
|
|
str(p.relative_to(root_dir)).replace('\\', '/') in changed_files
|
|
for p in paths
|
|
)
|
|
if involves_changed:
|
|
relevant_dupes[func_name] = paths
|
|
|
|
if not relevant_dupes:
|
|
return {}
|
|
|
|
# For relevant duplicates, check if they were already duplicated at base ref
|
|
# Only need to read base versions of files involved in these duplicates
|
|
files_to_check: Set[Path] = set()
|
|
for paths in relevant_dupes.values():
|
|
files_to_check.update(paths)
|
|
|
|
base_function_locations: Dict[str, List[Path]] = defaultdict(list)
|
|
for file_path in files_to_check:
|
|
if file_path.suffix.lower() not in {'.ts', '.tsx'}:
|
|
continue
|
|
content = get_file_content_at_ref(file_path, root_dir, compare_ref)
|
|
if content is None:
|
|
continue
|
|
functions = extract_functions_from_content(content)
|
|
for func in functions:
|
|
if func in SKIP_DUPLICATE_FUNCTIONS:
|
|
continue
|
|
if any(func.startswith(prefix) for prefix in SKIP_DUPLICATE_PREFIXES):
|
|
continue
|
|
base_function_locations[func].append(file_path)
|
|
|
|
base_dupes = {name for name, paths in base_function_locations.items() if len(paths) > 1}
|
|
|
|
# Return only new duplicates
|
|
return {name: paths for name, paths in relevant_dupes.items() if name not in base_dupes}
|
|
|
|
|
|
def find_threshold_regressions(
|
|
files: List[Tuple[Path, int]],
|
|
root_dir: Path,
|
|
compare_ref: str,
|
|
threshold: int,
|
|
) -> Tuple[List[Tuple[Path, int, Optional[int]]], List[Tuple[Path, int, int]]]:
|
|
"""
|
|
Find files that crossed the threshold or grew while already over it.
|
|
Returns two lists:
|
|
- crossed: (path, current_lines, base_lines) for files that newly crossed the threshold
|
|
- grew: (path, current_lines, base_lines) for files already over threshold that got larger
|
|
"""
|
|
crossed = []
|
|
grew = []
|
|
|
|
for file_path, current_lines in files:
|
|
if current_lines < threshold:
|
|
continue # Not over threshold now, skip
|
|
|
|
base_lines = get_line_count_at_ref(file_path, root_dir, compare_ref)
|
|
|
|
if base_lines is None or base_lines < threshold:
|
|
# New file or crossed the threshold
|
|
crossed.append((file_path, current_lines, base_lines))
|
|
elif current_lines > base_lines:
|
|
# Already over threshold and grew larger
|
|
grew.append((file_path, current_lines, base_lines))
|
|
|
|
return crossed, grew
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Analyze code files: list longest/shortest files, find duplicate function names'
|
|
)
|
|
parser.add_argument(
|
|
'-t', '--threshold',
|
|
type=int,
|
|
default=1000,
|
|
help='Warn about files longer than this many lines (default: 1000)'
|
|
)
|
|
parser.add_argument(
|
|
'--min-threshold',
|
|
type=int,
|
|
default=10,
|
|
help='Warn about files shorter than this many lines (default: 10)'
|
|
)
|
|
parser.add_argument(
|
|
'-n', '--top',
|
|
type=int,
|
|
default=20,
|
|
help='Show top N longest files (default: 20)'
|
|
)
|
|
parser.add_argument(
|
|
'-b', '--bottom',
|
|
type=int,
|
|
default=10,
|
|
help='Show bottom N shortest files (default: 10)'
|
|
)
|
|
parser.add_argument(
|
|
'-d', '--directory',
|
|
type=str,
|
|
default='.',
|
|
help='Directory to scan (default: current directory)'
|
|
)
|
|
parser.add_argument(
|
|
'--compare-to',
|
|
type=str,
|
|
default=None,
|
|
help='Git ref to compare against (e.g., origin/main). Only warn about files that grew past threshold.'
|
|
)
|
|
parser.add_argument(
|
|
'--strict',
|
|
action='store_true',
|
|
help='Exit with non-zero status if any violations found (for CI)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
root_dir = Path(args.directory).resolve()
|
|
|
|
# CI delta mode: only show regressions
|
|
if args.compare_to:
|
|
print(f"\n📂 Scanning: {root_dir}")
|
|
print(f"🔍 Comparing to: {args.compare_to}\n")
|
|
|
|
if not validate_git_ref(root_dir, args.compare_to):
|
|
print(f"❌ Invalid git ref: {args.compare_to}", file=sys.stderr)
|
|
print(" Make sure the ref exists (e.g. run 'git fetch origin <branch>')", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
files = find_code_files(root_dir)
|
|
violations = False
|
|
|
|
# Check file length regressions
|
|
crossed, grew = find_threshold_regressions(files, root_dir, args.compare_to, args.threshold)
|
|
|
|
if crossed:
|
|
print(f"⚠️ {len(crossed)} file(s) crossed {args.threshold} line threshold:\n")
|
|
for file_path, current, base in crossed:
|
|
relative_path = file_path.relative_to(root_dir)
|
|
if base is None:
|
|
print(f" {relative_path}: {current:,} lines (new file)")
|
|
else:
|
|
print(f" {relative_path}: {base:,} → {current:,} lines (+{current - base:,})")
|
|
print()
|
|
violations = True
|
|
else:
|
|
print(f"✅ No files crossed {args.threshold} line threshold")
|
|
|
|
if grew:
|
|
print(f"⚠️ {len(grew)} already-large file(s) grew larger:\n")
|
|
for file_path, current, base in grew:
|
|
relative_path = file_path.relative_to(root_dir)
|
|
print(f" {relative_path}: {base:,} → {current:,} lines (+{current - base:,})")
|
|
print()
|
|
violations = True
|
|
else:
|
|
print(f"✅ No already-large files grew")
|
|
|
|
# Check new duplicate function names
|
|
new_dupes = find_duplicate_regressions(files, root_dir, args.compare_to)
|
|
|
|
if new_dupes:
|
|
print(f"⚠️ {len(new_dupes)} new duplicate function name(s):\n")
|
|
for func_name in sorted(new_dupes.keys()):
|
|
paths = new_dupes[func_name]
|
|
print(f" {func_name}:")
|
|
for path in paths:
|
|
print(f" {path.relative_to(root_dir)}")
|
|
print()
|
|
violations = True
|
|
else:
|
|
print(f"✅ No new duplicate function names")
|
|
|
|
print()
|
|
if args.strict and violations:
|
|
sys.exit(1)
|
|
|
|
return
|
|
|
|
print(f"\n📂 Scanning: {root_dir}\n")
|
|
|
|
# Find and sort files by line count
|
|
files = find_code_files(root_dir)
|
|
files_desc = sorted(files, key=lambda x: x[1], reverse=True)
|
|
files_asc = sorted(files, key=lambda x: x[1])
|
|
|
|
# Show top N longest files
|
|
top_files = files_desc[:args.top]
|
|
|
|
print(f"📊 Top {min(args.top, len(top_files))} longest code files:\n")
|
|
print(f"{'Lines':>8} {'File'}")
|
|
print("-" * 60)
|
|
|
|
long_warnings = []
|
|
|
|
for file_path, line_count in top_files:
|
|
relative_path = file_path.relative_to(root_dir)
|
|
|
|
# Check if over threshold
|
|
if line_count >= args.threshold:
|
|
marker = " ⚠️"
|
|
long_warnings.append((relative_path, line_count))
|
|
else:
|
|
marker = ""
|
|
|
|
print(f"{line_count:>8} {relative_path}{marker}")
|
|
|
|
# Show bottom N shortest files
|
|
bottom_files = files_asc[:args.bottom]
|
|
|
|
print(f"\n📉 Bottom {min(args.bottom, len(bottom_files))} shortest code files:\n")
|
|
print(f"{'Lines':>8} {'File'}")
|
|
print("-" * 60)
|
|
|
|
short_warnings = []
|
|
|
|
for file_path, line_count in bottom_files:
|
|
relative_path = file_path.relative_to(root_dir)
|
|
filename = file_path.name
|
|
|
|
# Skip known barrel exports and stubs
|
|
is_expected_short = (
|
|
filename in SKIP_SHORT_PATTERNS or
|
|
any(filename.endswith(suffix) for suffix in SKIP_SHORT_SUFFIXES)
|
|
)
|
|
|
|
# Check if under threshold
|
|
if line_count <= args.min_threshold and not is_expected_short:
|
|
marker = " ⚠️"
|
|
short_warnings.append((relative_path, line_count))
|
|
else:
|
|
marker = ""
|
|
|
|
print(f"{line_count:>8} {relative_path}{marker}")
|
|
|
|
# Summary
|
|
total_files = len(files)
|
|
total_lines = sum(count for _, count in files)
|
|
|
|
print("-" * 60)
|
|
print(f"\n📈 Summary:")
|
|
print(f" Total code files: {total_files:,}")
|
|
print(f" Total lines: {total_lines:,}")
|
|
print(f" Average lines/file: {total_lines // total_files if total_files else 0:,}")
|
|
|
|
# Per-package breakdown
|
|
package_stats: dict[str, dict] = {}
|
|
for file_path, line_count in files:
|
|
pkg = get_package(file_path, root_dir)
|
|
if pkg not in package_stats:
|
|
package_stats[pkg] = {'files': 0, 'lines': 0}
|
|
package_stats[pkg]['files'] += 1
|
|
package_stats[pkg]['lines'] += line_count
|
|
|
|
print(f"\n📦 Per-package breakdown:\n")
|
|
print(f"{'Package':<15} {'Files':>8} {'Lines':>10} {'Avg':>8}")
|
|
print("-" * 45)
|
|
|
|
for pkg in sorted(package_stats.keys(), key=lambda p: package_stats[p]['lines'], reverse=True):
|
|
stats = package_stats[pkg]
|
|
avg = stats['lines'] // stats['files'] if stats['files'] else 0
|
|
print(f"{pkg:<15} {stats['files']:>8,} {stats['lines']:>10,} {avg:>8,}")
|
|
|
|
# Long file warnings
|
|
if long_warnings:
|
|
print(f"\n⚠️ Warning: {len(long_warnings)} file(s) exceed {args.threshold} lines (consider refactoring):")
|
|
for path, count in long_warnings:
|
|
print(f" - {path} ({count:,} lines)")
|
|
else:
|
|
print(f"\n✅ No files exceed {args.threshold} lines")
|
|
|
|
# Short file warnings
|
|
if short_warnings:
|
|
print(f"\n⚠️ Warning: {len(short_warnings)} file(s) are {args.min_threshold} lines or less (check if needed):")
|
|
for path, count in short_warnings:
|
|
print(f" - {path} ({count} lines)")
|
|
else:
|
|
print(f"\n✅ No files are {args.min_threshold} lines or less")
|
|
|
|
# Duplicate function names
|
|
duplicates = find_duplicate_functions(files, root_dir)
|
|
if duplicates:
|
|
print(f"\n⚠️ Warning: {len(duplicates)} function name(s) appear in multiple files (consider renaming):")
|
|
for func_name in sorted(duplicates.keys()):
|
|
paths = duplicates[func_name]
|
|
print(f" - {func_name}:")
|
|
for path in paths:
|
|
print(f" {path.relative_to(root_dir)}")
|
|
else:
|
|
print(f"\n✅ No duplicate function names")
|
|
|
|
print()
|
|
|
|
# Exit with error if --strict and there are violations
|
|
if args.strict and long_warnings:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|