diff --git a/install/reddit.sh b/install/reddit.sh index dea3a71a6..a776b630b 100755 --- a/install/reddit.sh +++ b/install/reddit.sh @@ -633,8 +633,8 @@ if [ ! -f /etc/cron.d/reddit ]; then # jobs that recalculate time-limited listings (e.g. top this year) PGPASSWORD=password -*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings link year '("hour", "day", "week", "month", "year")' -*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings comment year '("hour", "day", "week", "month", "year")' +*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings link year "['hour', 'day', 'week', 'month', 'year']" +*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings comment year "['hour', 'day', 'week', 'month', 'year']" # disabled by default, uncomment if you need these jobs #* * * * * root /sbin/start --quiet reddit-job-email diff --git a/r2/r2/lib/mr_top.py b/r2/r2/lib/mr_top.py index ebf449eda..0ccc495fa 100644 --- a/r2/r2/lib/mr_top.py +++ b/r2/r2/lib/mr_top.py @@ -144,3 +144,10 @@ def write_permacache(fd = sys.stdin): mr_tools.mr_reduce_max_per_key(lambda x: map(float, x[:-1]), num=1000, post=store_keys, fd = fd) + +def reduce_listings(fd=sys.stdin): + # like write_permacache, but just sends the reduced version of the listing + # to stdout instead of to the permacache. It's handy for debugging to see + # the final result before it's written out + mr_tools.mr_reduce_max_per_key(lambda x: map(float, x[:-1]), num=1000, + fd = fd) diff --git a/scripts/compute_time_listings b/scripts/compute_time_listings index 36bc16bc9..d13eefad9 100755 --- a/scripts/compute_time_listings +++ b/scripts/compute_time_listings @@ -69,7 +69,34 @@ function mrsort { } function reddit { - paster --plugin=r2 run $REDDIT_INI $REDDIT_ROOT/r2/lib/mr_top.py -c "$1" + reddit_usage() { + echo "reddit: [-jN] cmd..." 2>&1 + exit + } + + local OPTIND o njobs + + njobs=1 + + while getopts ":j:" o; do + case "${o}" in + j) + njobs="${OPTARG}" + ;; + *) + reddit_usage + ;; + esac + done + shift $((OPTIND-1)) + + cmd="paster --plugin=r2 run $REDDIT_INI $REDDIT_ROOT/r2/lib/mr_top.py -c \"$@ # $THING_CLS $INTERVAL $TIMES\"" + + if [ "$njobs" = "1" ]; then + sh -c "$cmd" # just execute it directly + else + $REDDIT_ROOT/../scripts/hashdist.py -n"$njobs" -- sh -c "$cmd" + fi } # Hack to let pg fetch all things with intervals @@ -105,9 +132,9 @@ run_query "\\copy (SELECT thing_id, 'data', '$THING_CLS', key, value cat $THING_DUMP $DATA_DUMP | mrsort | - reddit "join_things('$THING_CLS') # $TIMES" | + reddit "join_things('$THING_CLS')" | reddit "time_listings($TIMES, '$THING_CLS')" | mrsort | - reddit "write_permacache() # $THING_CLS $TIMES" + reddit -j4 "write_permacache()" echo 'Done.' diff --git a/scripts/hashdist.py b/scripts/hashdist.py new file mode 100755 index 000000000..fc92bc9fb --- /dev/null +++ b/scripts/hashdist.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python2.7 + +from Queue import Queue +import argparse +import logging +import multiprocessing +import os +import re +import string +import subprocess +import sys +import threading + + +def parse_size(s): + def mult(multiplier): + return int(s[:-1])*multiplier + + if all(x in string.digits for x in s): + return int(s) + if s.endswith('b'): + return mult(1) + if s.endswith('k'): + return mult(1024) + if s.endswith('m'): + return mult(1024*1024) + if s.endswith('g'): + return mult(1024*1024*1024) + raise Exception("Can't parse %r" % (s,)) + + +class JobInputter(threading.Thread): + """ + Takes input originally from stdin through iq and sends it to the job + """ + def __init__(self, job_name, popen, iq): + self.job_name = job_name + self.popen = popen + self.iq = iq + super(JobInputter, self).__init__() + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.job_name) + + def run(self): + while True: + item = self.iq.get() + logging.debug("%r got item %r", self, item) + if item is None: + logging.debug("%r closing %r", self, self.popen.stdin) + self.popen.stdin.close() + self.iq.task_done() + break + + try: + self.popen.stdin.write(item) + self.popen.stdin.flush() + self.iq.task_done() + except IOError: + logging.exception("exception writing to popen %r", self.popen) + return os._exit(1) + + +class JobOutputter(threading.Thread): + """ + Takes output from the job and sends it to stdout + """ + def __init__(self, job_name, popen, out_fd, lock): + self.job_name = job_name + self.popen = popen + self.out_fd = out_fd + self.lock = lock + super(JobOutputter, self).__init__() + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.job_name) + + def run(self): + for line in self.popen.stdout: + logging.debug("%r read %d bytes", self, len(line)) + with self.lock: + try: + self.out_fd.write(line) + except IOError as e: + if e.errno != errno.EPIPE: + logging.exception("exception writing to output %r", self.out_fd) + return os._exit(1) + + logging.debug("Got eof on %r", self) + + +def hash_select(key, choices): + return choices[hash(key) % len(choices)] + + +def main(): + try: + return _main() + except KeyboardInterrupt: + # because we mess with threads a lot, we need to make sure that ^C is + # actually a nuclear kill + os._exit(1) + +def _main(): + parser = argparse.ArgumentParser() + parser.add_argument('-n', metavar='N', type=int, + default=multiprocessing.cpu_count(), dest='nprocs') + parser.add_argument('-b', '--buffer', metavar='N', type=parse_size, + help="size (in lines) of input buffer for each process", + default=1024, + dest='bufsize') + parser.add_argument('-f', metavar='FIELDSEP', type=str, default='\t', + dest='field_sep') + parser.add_argument('-r', metavar='FIELDRE', type=str, default=None, + dest='field_re') + parser.add_argument('--logging', help=argparse.SUPPRESS, default='error') + parser.add_argument('cmd', nargs='+') + + args = parser.parse_args() + + if args.field_re and args.field_sep: + args.print_usage() + return sys.exit(1) + + if args.nprocs == 1: + # if you only want one, what do you need me for? + os.execvp(args.cmd[0], args.cmd) + return sys.exit(1) # will never get here + + if args.field_re: + first_field_re = re.compile(args.field_re) + else: + first_field_re = re.compile('^([^'+re.escape(args.field_sep)+']+)') + + logging.basicConfig(level=getattr(logging, args.logging.upper())) + + stdout_mutex = threading.Lock() + processes = [] + + for x in range(args.nprocs): + logging.debug("Starting %r (%d)", args.cmd, x) + ps = subprocess.Popen(args.cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + psi = JobInputter(x, ps, Queue(maxsize=args.bufsize)) + pso = JobOutputter(x, ps, sys.stdout, stdout_mutex) + psi.start() + pso.start() + processes.append((psi, pso)) + + for line in sys.stdin: + if not line: + continue + + logging.debug("Read %d bytes from stdin", len(line)) + + first_field_m = first_field_re.match(line) + first_field = first_field_m.group(0) + psi, _pso = hash_select(first_field, processes) + logging.debug("Writing %d bytes to %r (%r)", len(line), psi, first_field) + psi.iq.put(line) + + logging.debug("Hit eof on stdin") + + for x, (psi, pso) in enumerate(processes): + logging.debug("Sending terminator to %d (%r)", x, psi) + psi.iq.put(None) + + for x, (psi, pso) in enumerate(processes): + logging.debug("Waiting for q %d (%r)", x, psi) + psi.iq.join() + logging.debug("Waiting for psi %d (%r)", x, psi) + psi.join() + logging.debug("Waiting for pso %d (%r)", x, psi) + pso.join() + + return sys.exit(0) + + +if __name__ == '__main__': + main()