Files
reddit/scripts/hashdist.py
David King c8f10bb7b8 Parallelise parts of mr_top jobs
**Explanation**: compute_time_listings is slow. Really slow. At a quick glance, here are the jobs running right now:

    date: Sun Jan 17 20:04:56 PST 2016
    -rw-rw-r-- 1 ri ri 1.2G Jan 17 12:37 comment-week-data.dump
    -rw-rw-r-- 1 ri ri 683M Jan 17 12:25 comment-week-thing.dump
    -rw-rw-r-- 1 ri ri  53G Jan 16 07:13 comment-year-data.dump
    -rw-rw-r-- 1 ri ri  31G Jan 16 04:37 comment-year-thing.dump
    -rw-rw-r-- 1 ri ri 276M Jan 17 17:04 link-week-data.dump
    -rw-rw-r-- 1 ri ri  70M Jan 17 17:03 link-week-thing.dump

So the currently running top-comments-by-year listing has been running for nearly 37 hours and isn't done. top-comments-by-week has been running for 8 hours. top-links-by-week has been running for 3 hours. And this is just me checking on currently running jobs, not actual completion times.

The slow bit is the actual writing to Cassandra in `write_permacache`. This is mostly because `write_permacache` is extremely naive and blocks waiting for individual writes with no batching or parallelisation. There are a lot of ways to work around this and some of them will become easier when we're not longer writing out to the permacache at all, but until then (and even after that) this approach lets us keep doing the simple-to-understand thing while parallelising some of the work.

**The approach**: `compute_time_listings` is written as a mapreduce job in our `mr_tools` toolkit, with `write_permacache` as the final reducer. In `mr_tools`, you can run multiple reducers as long as a given reducer can be guaranteed to receive all of the keys for the same key. So this patch adds `hashdist.py`, a tool that runs multiple copies of a target job and distributes lines to them from stdin using their first tab-delimited field to meet this promise. (The same script could apply to mappers and sorts too but in my tests for this job the gains were minimal because `write_permacache` is still the bottleneck up to a large number of reducers.)

**Numbers**: A top-links-by-hour listing in prod right now takes 1m46.387s to run. This patch reduces that to 0m43.960s using 2 jobs (a 60% savings). That top-links-by-week job that before I killed after 3 hours completed in 56m47.329s. The top-links-by-year job that I killed last week at over 36 hours finished in 19 hours.

**Downsides**: It costs some additional RAM: roughly 10mb for hashdist.py and 100mb in memory for each additional copy of the job. It multiplies the effective load on Cassandra by the number of jobs (although I have no reason to believe that it's practical to overload Cassandra this way right now; I've tested up to 5 jobs).

**Further work**: with this we could easily do sort|reducer fusion to significantly reduce the work required by the sorter. `hashdist.py` as written is pretty slow and is only acceptable because `write_permcache` is even slower; a non-Python implementation would be straight forward and way faster.
2016-02-18 15:35:58 -08:00

182 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python2.7
from Queue import Queue
import argparse
import logging
import multiprocessing
import os
import re
import string
import subprocess
import sys
import threading
def parse_size(s):
def mult(multiplier):
return int(s[:-1])*multiplier
if all(x in string.digits for x in s):
return int(s)
if s.endswith('b'):
return mult(1)
if s.endswith('k'):
return mult(1024)
if s.endswith('m'):
return mult(1024*1024)
if s.endswith('g'):
return mult(1024*1024*1024)
raise Exception("Can't parse %r" % (s,))
class JobInputter(threading.Thread):
"""
Takes input originally from stdin through iq and sends it to the job
"""
def __init__(self, job_name, popen, iq):
self.job_name = job_name
self.popen = popen
self.iq = iq
super(JobInputter, self).__init__()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.job_name)
def run(self):
while True:
item = self.iq.get()
logging.debug("%r got item %r", self, item)
if item is None:
logging.debug("%r closing %r", self, self.popen.stdin)
self.popen.stdin.close()
self.iq.task_done()
break
try:
self.popen.stdin.write(item)
self.popen.stdin.flush()
self.iq.task_done()
except IOError:
logging.exception("exception writing to popen %r", self.popen)
return os._exit(1)
class JobOutputter(threading.Thread):
"""
Takes output from the job and sends it to stdout
"""
def __init__(self, job_name, popen, out_fd, lock):
self.job_name = job_name
self.popen = popen
self.out_fd = out_fd
self.lock = lock
super(JobOutputter, self).__init__()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.job_name)
def run(self):
for line in self.popen.stdout:
logging.debug("%r read %d bytes", self, len(line))
with self.lock:
try:
self.out_fd.write(line)
except IOError as e:
if e.errno != errno.EPIPE:
logging.exception("exception writing to output %r", self.out_fd)
return os._exit(1)
logging.debug("Got eof on %r", self)
def hash_select(key, choices):
return choices[hash(key) % len(choices)]
def main():
try:
return _main()
except KeyboardInterrupt:
# because we mess with threads a lot, we need to make sure that ^C is
# actually a nuclear kill
os._exit(1)
def _main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', metavar='N', type=int,
default=multiprocessing.cpu_count(), dest='nprocs')
parser.add_argument('-b', '--buffer', metavar='N', type=parse_size,
help="size (in lines) of input buffer for each process",
default=1024,
dest='bufsize')
parser.add_argument('-f', metavar='FIELDSEP', type=str, default='\t',
dest='field_sep')
parser.add_argument('-r', metavar='FIELDRE', type=str, default=None,
dest='field_re')
parser.add_argument('--logging', help=argparse.SUPPRESS, default='error')
parser.add_argument('cmd', nargs='+')
args = parser.parse_args()
if args.field_re and args.field_sep:
args.print_usage()
return sys.exit(1)
if args.nprocs == 1:
# if you only want one, what do you need me for?
os.execvp(args.cmd[0], args.cmd)
return sys.exit(1) # will never get here
if args.field_re:
first_field_re = re.compile(args.field_re)
else:
first_field_re = re.compile('^([^'+re.escape(args.field_sep)+']+)')
logging.basicConfig(level=getattr(logging, args.logging.upper()))
stdout_mutex = threading.Lock()
processes = []
for x in range(args.nprocs):
logging.debug("Starting %r (%d)", args.cmd, x)
ps = subprocess.Popen(args.cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
psi = JobInputter(x, ps, Queue(maxsize=args.bufsize))
pso = JobOutputter(x, ps, sys.stdout, stdout_mutex)
psi.start()
pso.start()
processes.append((psi, pso))
for line in sys.stdin:
if not line:
continue
logging.debug("Read %d bytes from stdin", len(line))
first_field_m = first_field_re.match(line)
first_field = first_field_m.group(0)
psi, _pso = hash_select(first_field, processes)
logging.debug("Writing %d bytes to %r (%r)", len(line), psi, first_field)
psi.iq.put(line)
logging.debug("Hit eof on stdin")
for x, (psi, pso) in enumerate(processes):
logging.debug("Sending terminator to %d (%r)", x, psi)
psi.iq.put(None)
for x, (psi, pso) in enumerate(processes):
logging.debug("Waiting for q %d (%r)", x, psi)
psi.iq.join()
logging.debug("Waiting for psi %d (%r)", x, psi)
psi.join()
logging.debug("Waiting for pso %d (%r)", x, psi)
pso.join()
return sys.exit(0)
if __name__ == '__main__':
main()