Files
reddit/r2/check-code
Neil Williams af09fa8dee Update license headers to 2015.
The highlight of each year for me.
2015-01-08 13:35:03 -08:00

402 lines
13 KiB
Python
Executable File

#!/usr/bin/python
# The contents of this file are subject to the Common Public Attribution
# License Version 1.0. (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
# License Version 1.1, but Sections 14 and 15 have been added to cover use of
# software over a computer network and provide for limited attribution for the
# Original Developer. In addition, Exhibit A has been modified to be consistent
# with Exhibit B.
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
# the specific language governing rights and limitations under the License.
#
# The Original Code is reddit.
#
# The Original Developer is the Initial Developer. The Initial Developer of
# the Original Code is reddit Inc.
#
# All portions of the code written by reddit are Copyright (c) 2006-2015 reddit
# Inc. All Rights Reserved.
###############################################################################
"""Check for new style guide violations in the current branch.
This script is meant to be used in a CI process to ensure that new changes
do not violate PEP-8, PEP-257, or any of the validity checks of pyflakes.
"""
import argparse
import collections
import difflib
import logging
import lxml.etree as etree
import os
import re
import subprocess
import sys
DEVNULL = open("/dev/null", "w")
TOOLS = collections.OrderedDict((
('pep8', ["pep8", "--repeat"]),
('pep257', ["pep257"]),
('pyflakes', ["pyflakes"]),
))
# Match *.py and *.pyx
PYFILE = re.compile(r".*\.pyx?$")
def assert_tools_available():
"""Check if the external binaries needed are available or exit."""
for tool in TOOLS.values():
binary = tool[0]
try:
subprocess.check_call(["which", binary], stdout=DEVNULL)
except subprocess.CalledProcessError:
logging.error("command %r not found. please install it!", binary)
sys.exit(1)
def assert_not_dirty():
"""Check if there are uncommitted changes in the repo and exit if so."""
try:
subprocess.check_call(["git", "diff",
"--no-ext-diff", "--quiet", "--exit-code"])
except subprocess.CalledProcessError:
logging.error("you have uncommitted changes. please commit them!")
sys.exit(1)
def _parse_ref(ref):
"""Return the result of git rev-parse on the given ref."""
ref = subprocess.check_output(["git", "rev-parse", ref])
return ref.strip()
def get_current_ref():
"""Return the most descriptive name possible of the current HEAD."""
try:
ref = subprocess.check_output(["git", "symbolic-ref", "HEAD"]).strip()
return ref[len("refs/heads/"):]
except subprocess.CalledProcessError:
return _parse_ref("HEAD")
def get_upstream_ref():
"""Return the ref that this topic branch is based on."""
return _parse_ref("master@{upstream}")
def get_merge_base():
upstream = get_upstream_ref()
current = get_current_ref()
output = subprocess.check_output(['git', 'merge-base', upstream, current])
return output.strip()
def get_root():
"""Return the root directory of this git project."""
return os.path.dirname(_parse_ref("--git-dir"))
def check_ref_out(ref):
"""Ask git to check out the specified ref."""
try:
subprocess.check_call(
["git", "checkout", ref],
stdout=DEVNULL,
stderr=DEVNULL,
)
except subprocess.CalledProcessError:
logging.error("failed to check out %s", ref)
sys.exit(1)
def walk_workspace():
root = get_root()
files = subprocess.check_output(['git', 'ls-files', '--full-name',
'--', root])
for filename in files.splitlines():
yield os.path.join(root, filename)
def select_files(files):
for f in files:
if re.match(PYFILE, f):
yield f
else:
try:
with open(f) as f_:
first = f_.readline()
if first.startswith('#!') and 'python' in first:
yield f
except (IOError, OSError):
logging.exception("Unable to check-code against %s", f)
def extract_errtype(violation, tool):
"""Based on a line of `tool`'s output, return the kind of infraction.
Mostly relevant for pep8, which has various kinds of infractions,
such as E501 for line too long.
"""
if tool == 'pep8':
# E501 line too long (91 characters)
errtype, sep, message = violation.partition(" ")
if not sep:
errtype = 'PEP8'
elif tool == 'pep257':
errtype = 'PEP257'
elif tool == 'pyflakes':
errtype = 'pyflakes'
return errtype
def make_test_class(tool, filepath):
no_ext, ext = os.path.splitext(filepath)
test_class_suffix = no_ext.replace(os.path.sep, '_')
return tool + '.' + test_class_suffix
def extract_line_info(reportline, filepath, tool):
if tool == 'pep257' and reportline.startswith('Note: checks'):
return None
file_info, sep, violation = reportline.partition(": ")
if not sep:
return None
file_info = file_info.split(":")
if len(file_info) < 2:
logging.warn("I don't understand this report line: %r", reportline)
line_num = ''
else:
line_num = file_info[1]
report_entry = {
'file': filepath,
'test_class': make_test_class(tool, filepath),
'line': line_num,
'violation': violation,
'errtype': extract_errtype(violation, tool),
'tool': tool,
}
return report_entry
def generate_report(toolname, files=None):
if not files:
files = walk_workspace()
report = []
for filepath in select_files(files):
command = TOOLS[toolname] + [filepath]
logging.info(" ".join(command))
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
lines = process.communicate()[0].splitlines()
ws_root = get_root()
ws_filepath = os.path.relpath(filepath, ws_root)
for line in lines:
line = extract_line_info(line, ws_filepath, toolname)
if line:
report.append(line)
return report
def generate_all_reports(ref=None, files=None):
"""Run the tools on the specified files and return errors / warnings."""
if ref:
check_ref_out(ref)
report = collections.OrderedDict.fromkeys(TOOLS.keys())
for tool in TOOLS:
report[tool] = generate_report(tool, files)
return report
def get_changed_files(old_ref, new_ref):
"""Return a list of files that have changed from one ref to another."""
root = get_root()
changed_files_text = subprocess.check_output(["git", "diff", "--name-only",
old_ref, new_ref])
changed_files = changed_files_text.splitlines()
return [os.path.join(root, x) for x in changed_files]
def diffable(report):
"""Convert the report to a list of lines that are reasonably 'diffable'.
That is, standard diff tools should be able to identify new or fixed
violations by comparing results of this function
"""
updated = []
for toolname, violations in report.iteritems():
updated.append(toolname)
updated.extend('%(file)s %(violation)s' % v for v in violations)
updated.append('')
return updated
def human(report):
"""Convert the report to a list of human useful lines."""
updated = []
for toolname, violations in report.iteritems():
updated.append(toolname)
updated.extend('%(file)s:%(line)s %(violation)s' % v
for v in violations)
updated.append('')
return updated
def junitize(report):
"""Convert the report into JUnit style XML.
This allows the report to be consumed by tools that consume JUnit reports
The style used here is: each file is a <testsuite>; each violation
will be a <testcase> (always failed) whose "classname" shall be the tool
used (e.g., pep8) and "name" shall be the type of violation and line
number. Any additional information shall be included as the <failure>
message.
"""
by_file = {}
for violations in report.itervalues():
for violation in violations:
file_errors = by_file.setdefault(violation['file'], [])
file_errors.append(violation)
violations = etree.Element("testsuites")
for filename in by_file:
file_errs = etree.SubElement(violations, "testsuite")
for violation in by_file[filename]:
entry = etree.SubElement(file_errs, "testcase")
entry.attrib['classname'] = violation['test_class']
entry.attrib['name'] = violation['line']
error_info = etree.SubElement(entry, "failure")
error_info.attrib['message'] = violation['violation']
error_info.attrib['type'] = violation['errtype']
return violations
def make_errname(violation):
"""Create a unique "test name" for this violation."""
name = '.'.join((violation['errtype'], violation['line']))
return name
def diff_report(options):
if options.check_dirty:
assert_not_dirty()
current_ref = get_current_ref()
base_ref = get_merge_base()
if options.files:
files = options.files
else:
files = get_changed_files(base_ref, current_ref)
logging.debug("files changed: %r", files)
try:
new_report = diffable(generate_all_reports(current_ref, files))
logging.debug("new report:\n%r", new_report)
old_report = diffable(generate_all_reports(base_ref, files))
logging.debug("old report:\n%r", old_report)
finally:
check_ref_out(current_ref)
return difflib.unified_diff(old_report, new_report)
def regression_report(options):
added, removed = 0, 0
for line in diff_report(options):
line = line.strip()
if line == "+++" or line == "---":
continue
if line.startswith("+"):
added += 1
elif line.startswith("-"):
removed += 1
if added:
print >> options.out, "added %d issues" % added
if removed:
print >> options.out, "removed %d issues!" % removed
return 1 if added else 0
def junit_report(options):
report = generate_all_reports(files=options.files)
junit = junitize(report)
print >> options.out, etree.tostring(junit, pretty_print=True)
def human_report(options):
if options.full:
report = human(generate_all_reports(files=options.files))
else:
files = (options.files or
get_changed_files(get_merge_base(), get_current_ref()))
logging.debug("changed files: %r", files)
report = human(generate_all_reports(files=files))
for line in report:
print >> options.out, line
def parse_args(args):
parser = argparse.ArgumentParser(description="Report on python problems")
parser.add_argument('--dirty', dest='check_dirty', action='store_false',
help="Skip the dirty workspace check.")
parser.add_argument('-O', dest='out', type=argparse.FileType('w'),
default=sys.stdout, help="Write the report to OUT"
" instead of stdout.")
parser.add_argument('--verbose', '-v', action='count', dest='verbosity',
help="Show verbose reporting messages.",
default=0)
parser.add_argument('--quiet', '-q', action='count', default=0,
help="Reduce verbosity")
parser.add_argument('--full', action='store_true', help="When generating"
" a {report}, show all files, not just changed ones.")
parser.add_argument('report', choices=('junit', 'regression', 'report'))
parser.add_argument('files', nargs='*', metavar='FILE')
options = parser.parse_args(args)
set_up_logging(options.verbosity - options.quiet)
logging.debug("Options: %r", options)
return options
def set_up_logging(verbosity):
levels = {-2: logging.ERROR, -1: logging.WARN, 0: logging.INFO,
1: logging.DEBUG}
max_level = max(levels.keys())
min_level = min(levels.keys())
verbosity = min(verbosity, max_level)
verbosity = max(verbosity, min_level)
level = levels[verbosity]
format_ = '%(levelname)s %(message)s'
logging.basicConfig(level=level, format=format_)
def main():
options = parse_args(sys.argv[1:])
if options.report == 'regression':
command = regression_report
elif options.report == 'junit':
command = junit_report
elif options.report == 'report':
command = human_report
assert_tools_available()
sys.exit(command(options))
if __name__ == "__main__":
main()