mirror of
https://github.com/reddit-archive/reddit.git
synced 2026-04-27 03:00:12 -04:00
**Explanation**: compute_time_listings is slow. Really slow. At a quick glance, here are the jobs running right now:
date: Sun Jan 17 20:04:56 PST 2016
-rw-rw-r-- 1 ri ri 1.2G Jan 17 12:37 comment-week-data.dump
-rw-rw-r-- 1 ri ri 683M Jan 17 12:25 comment-week-thing.dump
-rw-rw-r-- 1 ri ri 53G Jan 16 07:13 comment-year-data.dump
-rw-rw-r-- 1 ri ri 31G Jan 16 04:37 comment-year-thing.dump
-rw-rw-r-- 1 ri ri 276M Jan 17 17:04 link-week-data.dump
-rw-rw-r-- 1 ri ri 70M Jan 17 17:03 link-week-thing.dump
So the currently running top-comments-by-year listing has been running for nearly 37 hours and isn't done. top-comments-by-week has been running for 8 hours. top-links-by-week has been running for 3 hours. And this is just me checking on currently running jobs, not actual completion times.
The slow bit is the actual writing to Cassandra in `write_permacache`. This is mostly because `write_permacache` is extremely naive and blocks waiting for individual writes with no batching or parallelisation. There are a lot of ways to work around this and some of them will become easier when we're not longer writing out to the permacache at all, but until then (and even after that) this approach lets us keep doing the simple-to-understand thing while parallelising some of the work.
**The approach**: `compute_time_listings` is written as a mapreduce job in our `mr_tools` toolkit, with `write_permacache` as the final reducer. In `mr_tools`, you can run multiple reducers as long as a given reducer can be guaranteed to receive all of the keys for the same key. So this patch adds `hashdist.py`, a tool that runs multiple copies of a target job and distributes lines to them from stdin using their first tab-delimited field to meet this promise. (The same script could apply to mappers and sorts too but in my tests for this job the gains were minimal because `write_permacache` is still the bottleneck up to a large number of reducers.)
**Numbers**: A top-links-by-hour listing in prod right now takes 1m46.387s to run. This patch reduces that to 0m43.960s using 2 jobs (a 60% savings). That top-links-by-week job that before I killed after 3 hours completed in 56m47.329s. The top-links-by-year job that I killed last week at over 36 hours finished in 19 hours.
**Downsides**: It costs some additional RAM: roughly 10mb for hashdist.py and 100mb in memory for each additional copy of the job. It multiplies the effective load on Cassandra by the number of jobs (although I have no reason to believe that it's practical to overload Cassandra this way right now; I've tested up to 5 jobs).
**Further work**: with this we could easily do sort|reducer fusion to significantly reduce the work required by the sorter. `hashdist.py` as written is pretty slow and is only acceptable because `write_permcache` is even slower; a non-Python implementation would be straight forward and way faster.
182 lines
5.3 KiB
Python
Executable File
182 lines
5.3 KiB
Python
Executable File
#!/usr/bin/env python2.7
|
|
|
|
from Queue import Queue
|
|
import argparse
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
|
|
|
|
def parse_size(s):
|
|
def mult(multiplier):
|
|
return int(s[:-1])*multiplier
|
|
|
|
if all(x in string.digits for x in s):
|
|
return int(s)
|
|
if s.endswith('b'):
|
|
return mult(1)
|
|
if s.endswith('k'):
|
|
return mult(1024)
|
|
if s.endswith('m'):
|
|
return mult(1024*1024)
|
|
if s.endswith('g'):
|
|
return mult(1024*1024*1024)
|
|
raise Exception("Can't parse %r" % (s,))
|
|
|
|
|
|
class JobInputter(threading.Thread):
|
|
"""
|
|
Takes input originally from stdin through iq and sends it to the job
|
|
"""
|
|
def __init__(self, job_name, popen, iq):
|
|
self.job_name = job_name
|
|
self.popen = popen
|
|
self.iq = iq
|
|
super(JobInputter, self).__init__()
|
|
|
|
def __repr__(self):
|
|
return "<%s %s>" % (self.__class__.__name__, self.job_name)
|
|
|
|
def run(self):
|
|
while True:
|
|
item = self.iq.get()
|
|
logging.debug("%r got item %r", self, item)
|
|
if item is None:
|
|
logging.debug("%r closing %r", self, self.popen.stdin)
|
|
self.popen.stdin.close()
|
|
self.iq.task_done()
|
|
break
|
|
|
|
try:
|
|
self.popen.stdin.write(item)
|
|
self.popen.stdin.flush()
|
|
self.iq.task_done()
|
|
except IOError:
|
|
logging.exception("exception writing to popen %r", self.popen)
|
|
return os._exit(1)
|
|
|
|
|
|
class JobOutputter(threading.Thread):
|
|
"""
|
|
Takes output from the job and sends it to stdout
|
|
"""
|
|
def __init__(self, job_name, popen, out_fd, lock):
|
|
self.job_name = job_name
|
|
self.popen = popen
|
|
self.out_fd = out_fd
|
|
self.lock = lock
|
|
super(JobOutputter, self).__init__()
|
|
|
|
def __repr__(self):
|
|
return "<%s %s>" % (self.__class__.__name__, self.job_name)
|
|
|
|
def run(self):
|
|
for line in self.popen.stdout:
|
|
logging.debug("%r read %d bytes", self, len(line))
|
|
with self.lock:
|
|
try:
|
|
self.out_fd.write(line)
|
|
except IOError as e:
|
|
if e.errno != errno.EPIPE:
|
|
logging.exception("exception writing to output %r", self.out_fd)
|
|
return os._exit(1)
|
|
|
|
logging.debug("Got eof on %r", self)
|
|
|
|
|
|
def hash_select(key, choices):
|
|
return choices[hash(key) % len(choices)]
|
|
|
|
|
|
def main():
|
|
try:
|
|
return _main()
|
|
except KeyboardInterrupt:
|
|
# because we mess with threads a lot, we need to make sure that ^C is
|
|
# actually a nuclear kill
|
|
os._exit(1)
|
|
|
|
def _main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('-n', metavar='N', type=int,
|
|
default=multiprocessing.cpu_count(), dest='nprocs')
|
|
parser.add_argument('-b', '--buffer', metavar='N', type=parse_size,
|
|
help="size (in lines) of input buffer for each process",
|
|
default=1024,
|
|
dest='bufsize')
|
|
parser.add_argument('-f', metavar='FIELDSEP', type=str, default='\t',
|
|
dest='field_sep')
|
|
parser.add_argument('-r', metavar='FIELDRE', type=str, default=None,
|
|
dest='field_re')
|
|
parser.add_argument('--logging', help=argparse.SUPPRESS, default='error')
|
|
parser.add_argument('cmd', nargs='+')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.field_re and args.field_sep:
|
|
args.print_usage()
|
|
return sys.exit(1)
|
|
|
|
if args.nprocs == 1:
|
|
# if you only want one, what do you need me for?
|
|
os.execvp(args.cmd[0], args.cmd)
|
|
return sys.exit(1) # will never get here
|
|
|
|
if args.field_re:
|
|
first_field_re = re.compile(args.field_re)
|
|
else:
|
|
first_field_re = re.compile('^([^'+re.escape(args.field_sep)+']+)')
|
|
|
|
logging.basicConfig(level=getattr(logging, args.logging.upper()))
|
|
|
|
stdout_mutex = threading.Lock()
|
|
processes = []
|
|
|
|
for x in range(args.nprocs):
|
|
logging.debug("Starting %r (%d)", args.cmd, x)
|
|
ps = subprocess.Popen(args.cmd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE)
|
|
psi = JobInputter(x, ps, Queue(maxsize=args.bufsize))
|
|
pso = JobOutputter(x, ps, sys.stdout, stdout_mutex)
|
|
psi.start()
|
|
pso.start()
|
|
processes.append((psi, pso))
|
|
|
|
for line in sys.stdin:
|
|
if not line:
|
|
continue
|
|
|
|
logging.debug("Read %d bytes from stdin", len(line))
|
|
|
|
first_field_m = first_field_re.match(line)
|
|
first_field = first_field_m.group(0)
|
|
psi, _pso = hash_select(first_field, processes)
|
|
logging.debug("Writing %d bytes to %r (%r)", len(line), psi, first_field)
|
|
psi.iq.put(line)
|
|
|
|
logging.debug("Hit eof on stdin")
|
|
|
|
for x, (psi, pso) in enumerate(processes):
|
|
logging.debug("Sending terminator to %d (%r)", x, psi)
|
|
psi.iq.put(None)
|
|
|
|
for x, (psi, pso) in enumerate(processes):
|
|
logging.debug("Waiting for q %d (%r)", x, psi)
|
|
psi.iq.join()
|
|
logging.debug("Waiting for psi %d (%r)", x, psi)
|
|
psi.join()
|
|
logging.debug("Waiting for pso %d (%r)", x, psi)
|
|
pso.join()
|
|
|
|
return sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|