From c8f10bb7b85135ca4ef439934e8977613dfe8a39 Mon Sep 17 00:00:00 2001 From: David King Date: Thu, 28 Jan 2016 15:56:58 -0800 Subject: [PATCH] Parallelise parts of mr_top jobs **Explanation**: compute_time_listings is slow. Really slow. At a quick glance, here are the jobs running right now: date: Sun Jan 17 20:04:56 PST 2016 -rw-rw-r-- 1 ri ri 1.2G Jan 17 12:37 comment-week-data.dump -rw-rw-r-- 1 ri ri 683M Jan 17 12:25 comment-week-thing.dump -rw-rw-r-- 1 ri ri 53G Jan 16 07:13 comment-year-data.dump -rw-rw-r-- 1 ri ri 31G Jan 16 04:37 comment-year-thing.dump -rw-rw-r-- 1 ri ri 276M Jan 17 17:04 link-week-data.dump -rw-rw-r-- 1 ri ri 70M Jan 17 17:03 link-week-thing.dump So the currently running top-comments-by-year listing has been running for nearly 37 hours and isn't done. top-comments-by-week has been running for 8 hours. top-links-by-week has been running for 3 hours. And this is just me checking on currently running jobs, not actual completion times. The slow bit is the actual writing to Cassandra in `write_permacache`. This is mostly because `write_permacache` is extremely naive and blocks waiting for individual writes with no batching or parallelisation. There are a lot of ways to work around this and some of them will become easier when we're not longer writing out to the permacache at all, but until then (and even after that) this approach lets us keep doing the simple-to-understand thing while parallelising some of the work. **The approach**: `compute_time_listings` is written as a mapreduce job in our `mr_tools` toolkit, with `write_permacache` as the final reducer. In `mr_tools`, you can run multiple reducers as long as a given reducer can be guaranteed to receive all of the keys for the same key. So this patch adds `hashdist.py`, a tool that runs multiple copies of a target job and distributes lines to them from stdin using their first tab-delimited field to meet this promise. (The same script could apply to mappers and sorts too but in my tests for this job the gains were minimal because `write_permacache` is still the bottleneck up to a large number of reducers.) **Numbers**: A top-links-by-hour listing in prod right now takes 1m46.387s to run. This patch reduces that to 0m43.960s using 2 jobs (a 60% savings). That top-links-by-week job that before I killed after 3 hours completed in 56m47.329s. The top-links-by-year job that I killed last week at over 36 hours finished in 19 hours. **Downsides**: It costs some additional RAM: roughly 10mb for hashdist.py and 100mb in memory for each additional copy of the job. It multiplies the effective load on Cassandra by the number of jobs (although I have no reason to believe that it's practical to overload Cassandra this way right now; I've tested up to 5 jobs). **Further work**: with this we could easily do sort|reducer fusion to significantly reduce the work required by the sorter. `hashdist.py` as written is pretty slow and is only acceptable because `write_permcache` is even slower; a non-Python implementation would be straight forward and way faster. --- install/reddit.sh | 4 +- r2/r2/lib/mr_top.py | 7 ++ scripts/compute_time_listings | 33 ++++++- scripts/hashdist.py | 181 ++++++++++++++++++++++++++++++++++ 4 files changed, 220 insertions(+), 5 deletions(-) create mode 100755 scripts/hashdist.py diff --git a/install/reddit.sh b/install/reddit.sh index dea3a71a6..a776b630b 100755 --- a/install/reddit.sh +++ b/install/reddit.sh @@ -633,8 +633,8 @@ if [ ! -f /etc/cron.d/reddit ]; then # jobs that recalculate time-limited listings (e.g. top this year) PGPASSWORD=password -*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings link year '("hour", "day", "week", "month", "year")' -*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings comment year '("hour", "day", "week", "month", "year")' +*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings link year "['hour', 'day', 'week', 'month', 'year']" +*/15 * * * * $REDDIT_USER $REDDIT_SRC/reddit/scripts/compute_time_listings comment year "['hour', 'day', 'week', 'month', 'year']" # disabled by default, uncomment if you need these jobs #* * * * * root /sbin/start --quiet reddit-job-email diff --git a/r2/r2/lib/mr_top.py b/r2/r2/lib/mr_top.py index ebf449eda..0ccc495fa 100644 --- a/r2/r2/lib/mr_top.py +++ b/r2/r2/lib/mr_top.py @@ -144,3 +144,10 @@ def write_permacache(fd = sys.stdin): mr_tools.mr_reduce_max_per_key(lambda x: map(float, x[:-1]), num=1000, post=store_keys, fd = fd) + +def reduce_listings(fd=sys.stdin): + # like write_permacache, but just sends the reduced version of the listing + # to stdout instead of to the permacache. It's handy for debugging to see + # the final result before it's written out + mr_tools.mr_reduce_max_per_key(lambda x: map(float, x[:-1]), num=1000, + fd = fd) diff --git a/scripts/compute_time_listings b/scripts/compute_time_listings index 36bc16bc9..d13eefad9 100755 --- a/scripts/compute_time_listings +++ b/scripts/compute_time_listings @@ -69,7 +69,34 @@ function mrsort { } function reddit { - paster --plugin=r2 run $REDDIT_INI $REDDIT_ROOT/r2/lib/mr_top.py -c "$1" + reddit_usage() { + echo "reddit: [-jN] cmd..." 2>&1 + exit + } + + local OPTIND o njobs + + njobs=1 + + while getopts ":j:" o; do + case "${o}" in + j) + njobs="${OPTARG}" + ;; + *) + reddit_usage + ;; + esac + done + shift $((OPTIND-1)) + + cmd="paster --plugin=r2 run $REDDIT_INI $REDDIT_ROOT/r2/lib/mr_top.py -c \"$@ # $THING_CLS $INTERVAL $TIMES\"" + + if [ "$njobs" = "1" ]; then + sh -c "$cmd" # just execute it directly + else + $REDDIT_ROOT/../scripts/hashdist.py -n"$njobs" -- sh -c "$cmd" + fi } # Hack to let pg fetch all things with intervals @@ -105,9 +132,9 @@ run_query "\\copy (SELECT thing_id, 'data', '$THING_CLS', key, value cat $THING_DUMP $DATA_DUMP | mrsort | - reddit "join_things('$THING_CLS') # $TIMES" | + reddit "join_things('$THING_CLS')" | reddit "time_listings($TIMES, '$THING_CLS')" | mrsort | - reddit "write_permacache() # $THING_CLS $TIMES" + reddit -j4 "write_permacache()" echo 'Done.' diff --git a/scripts/hashdist.py b/scripts/hashdist.py new file mode 100755 index 000000000..fc92bc9fb --- /dev/null +++ b/scripts/hashdist.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python2.7 + +from Queue import Queue +import argparse +import logging +import multiprocessing +import os +import re +import string +import subprocess +import sys +import threading + + +def parse_size(s): + def mult(multiplier): + return int(s[:-1])*multiplier + + if all(x in string.digits for x in s): + return int(s) + if s.endswith('b'): + return mult(1) + if s.endswith('k'): + return mult(1024) + if s.endswith('m'): + return mult(1024*1024) + if s.endswith('g'): + return mult(1024*1024*1024) + raise Exception("Can't parse %r" % (s,)) + + +class JobInputter(threading.Thread): + """ + Takes input originally from stdin through iq and sends it to the job + """ + def __init__(self, job_name, popen, iq): + self.job_name = job_name + self.popen = popen + self.iq = iq + super(JobInputter, self).__init__() + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.job_name) + + def run(self): + while True: + item = self.iq.get() + logging.debug("%r got item %r", self, item) + if item is None: + logging.debug("%r closing %r", self, self.popen.stdin) + self.popen.stdin.close() + self.iq.task_done() + break + + try: + self.popen.stdin.write(item) + self.popen.stdin.flush() + self.iq.task_done() + except IOError: + logging.exception("exception writing to popen %r", self.popen) + return os._exit(1) + + +class JobOutputter(threading.Thread): + """ + Takes output from the job and sends it to stdout + """ + def __init__(self, job_name, popen, out_fd, lock): + self.job_name = job_name + self.popen = popen + self.out_fd = out_fd + self.lock = lock + super(JobOutputter, self).__init__() + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.job_name) + + def run(self): + for line in self.popen.stdout: + logging.debug("%r read %d bytes", self, len(line)) + with self.lock: + try: + self.out_fd.write(line) + except IOError as e: + if e.errno != errno.EPIPE: + logging.exception("exception writing to output %r", self.out_fd) + return os._exit(1) + + logging.debug("Got eof on %r", self) + + +def hash_select(key, choices): + return choices[hash(key) % len(choices)] + + +def main(): + try: + return _main() + except KeyboardInterrupt: + # because we mess with threads a lot, we need to make sure that ^C is + # actually a nuclear kill + os._exit(1) + +def _main(): + parser = argparse.ArgumentParser() + parser.add_argument('-n', metavar='N', type=int, + default=multiprocessing.cpu_count(), dest='nprocs') + parser.add_argument('-b', '--buffer', metavar='N', type=parse_size, + help="size (in lines) of input buffer for each process", + default=1024, + dest='bufsize') + parser.add_argument('-f', metavar='FIELDSEP', type=str, default='\t', + dest='field_sep') + parser.add_argument('-r', metavar='FIELDRE', type=str, default=None, + dest='field_re') + parser.add_argument('--logging', help=argparse.SUPPRESS, default='error') + parser.add_argument('cmd', nargs='+') + + args = parser.parse_args() + + if args.field_re and args.field_sep: + args.print_usage() + return sys.exit(1) + + if args.nprocs == 1: + # if you only want one, what do you need me for? + os.execvp(args.cmd[0], args.cmd) + return sys.exit(1) # will never get here + + if args.field_re: + first_field_re = re.compile(args.field_re) + else: + first_field_re = re.compile('^([^'+re.escape(args.field_sep)+']+)') + + logging.basicConfig(level=getattr(logging, args.logging.upper())) + + stdout_mutex = threading.Lock() + processes = [] + + for x in range(args.nprocs): + logging.debug("Starting %r (%d)", args.cmd, x) + ps = subprocess.Popen(args.cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + psi = JobInputter(x, ps, Queue(maxsize=args.bufsize)) + pso = JobOutputter(x, ps, sys.stdout, stdout_mutex) + psi.start() + pso.start() + processes.append((psi, pso)) + + for line in sys.stdin: + if not line: + continue + + logging.debug("Read %d bytes from stdin", len(line)) + + first_field_m = first_field_re.match(line) + first_field = first_field_m.group(0) + psi, _pso = hash_select(first_field, processes) + logging.debug("Writing %d bytes to %r (%r)", len(line), psi, first_field) + psi.iq.put(line) + + logging.debug("Hit eof on stdin") + + for x, (psi, pso) in enumerate(processes): + logging.debug("Sending terminator to %d (%r)", x, psi) + psi.iq.put(None) + + for x, (psi, pso) in enumerate(processes): + logging.debug("Waiting for q %d (%r)", x, psi) + psi.iq.join() + logging.debug("Waiting for psi %d (%r)", x, psi) + psi.join() + logging.debug("Waiting for pso %d (%r)", x, psi) + pso.join() + + return sys.exit(0) + + +if __name__ == '__main__': + main()