Added a new system for precomputing queries on reddit. This will allow every

listing to be precomputed and stored in memcache for retrieval at request-time.
Currently only reddit listings are precomputed. Profile-page precomputing is
disabled (and relatively untested).
This commit is contained in:
Steve Huffman
2008-08-20 11:48:24 -07:00
parent 648fe4ca9c
commit 33fd4e9684
20 changed files with 684 additions and 151 deletions

View File

@@ -60,6 +60,13 @@ email_engine = db_manager.get_engine(g.email_db_name,
pool_size = 2,
max_overflow = 2)
query_queue_engine = db_manager.get_engine(g.query_queue_db_name,
db_host = g.query_queue_db_host,
db_user = g.query_queue_db_user,
db_pass = g.query_queue_db_pass,
pool_size = 2,
max_overflow = 2)
dbm.type_db = main_engine
dbm.relation_type_db = main_engine

View File

@@ -97,7 +97,7 @@ def make_map(global_conf={}, app_conf={}):
mc('/', controller='hot', action='listing')
listing_controllers = "hot|saved|toplinks|new|recommended|normalized|randomrising"
listing_controllers = "hot|saved|toplinks|new|recommended|randomrising"
mc('/:controller', action='listing',
requirements=dict(controller=listing_controllers))
@@ -130,6 +130,8 @@ def make_map(global_conf={}, app_conf={}):
mc('/captcha/:iden', controller='captcha', action='captchaimg')
mc('/doquery', controller='query', action='doquery')
mc('/store', controller='redirect', action='redirect',
dest='http://store.reddit.com/index.html')
@@ -138,6 +140,7 @@ def make_map(global_conf={}, app_conf={}):
mc('/mobile', controller='redirect', action='redirect',
dest='http://m.reddit.com/')
# This route handles displaying the error page and
# graphics used in the 404/500

View File

@@ -21,7 +21,6 @@
################################################################################
from listingcontroller import ListingController
from listingcontroller import HotController
from listingcontroller import NormalizedController
from listingcontroller import SavedController
from listingcontroller import ToplinksController
from listingcontroller import NewController
@@ -43,6 +42,8 @@ from post import PostController
from toolbar import ToolbarController
from i18n import I18nController
from querycontroller import QueryController
try:
from r2admin.controllers.adminapi import ApiController
except ImportError:

View File

@@ -36,10 +36,11 @@ from r2.lib.pages import FriendList, ContributorList, ModList, \
from r2.lib.menus import CommentSortMenu
from r2.lib.translation import Translator
from r2.lib.normalized_hot import expire_hot, is_top_link
from r2.lib.normalized_hot import expire_hot
from r2.lib.captcha import get_iden
from r2.lib import emailer
from r2.lib.strings import strings
from r2.lib.db import queries
from r2.config import cache
from simplejson import dumps
@@ -281,6 +282,10 @@ class ApiController(RedditController):
if should_ratelimit:
VRatelimit.ratelimit(rate_user=True, rate_ip = True)
#update the queries
if g.write_query_queue:
queries.new_link(l)
#update the modified flags
set_last_modified(c.user, 'overview')
set_last_modified(c.user, 'submitted')
@@ -698,15 +703,21 @@ class ApiController(RedditController):
else False if dir < 0
else None)
organic = vote_type == 'organic'
Vote.vote(user, thing, dir, ip, spam, organic)
v = Vote.vote(user, thing, dir, ip, spam, organic)
#update last modified
set_last_modified(c.user, 'liked')
set_last_modified(c.user, 'disliked')
#update the queries
if g.write_query_queue:
queries.new_vote(v)
# flag search indexer that something has changed
tc.changed(thing)
@Json
@validate(VUser(),
VModhash(),

View File

@@ -102,3 +102,5 @@ class ErrorSet(object):
def remove(self, error_name):
if self.errors.has_key(error_name):
del self.errors[error_name]
class UserRequiredException(Exception): pass

View File

@@ -37,7 +37,6 @@ from pylons.i18n import _
import random
class ListingController(RedditController):
"""Generalized controller for pages with lists of links."""
@@ -54,15 +53,13 @@ class ListingController(RedditController):
# any text that should be shown on the top of the page
infotext = None
# builder class to use to generate the listing
builder_cls = QueryBuilder
# builder class to use to generate the listing. if none, we'll try
# to figure it out based on the query type
builder_cls = None
# page title
title_text = ''
# toggles the stat collector for keeping track of what queries are being run
collect_stats = False
# login box, subreddit box, submit box, etc, visible
show_sidebar = True
@@ -86,10 +83,6 @@ class ListingController(RedditController):
self.reverse = reverse
self.query_obj = self.query()
if self.collect_stats and g.REDDIT_MAIN:
self.query_obj._stats_collector = g.stats_collector
self.builder_obj = self.builder()
self.listing_obj = self.listing()
content = self.content()
@@ -111,14 +104,21 @@ class ListingController(RedditController):
def builder(self):
#store the query itself so it can be used elsewhere
b = self.builder_cls(self.query_obj,
num = self.num,
skip = self.skip,
after = self.after,
count = self.count,
reverse = self.reverse,
prewrap_fn = self.prewrap_fn,
wrap = self.builder_wrapper)
if self.builder_cls:
builder_cls = self.builder_cls
elif isinstance(self.query_obj, Query):
builder_cls = QueryBuilder
elif isinstance(self.query_obj, list):
builder_cls = IDBuilder
b = builder_cls(self.query_obj,
num = self.num,
skip = self.skip,
after = self.after,
count = self.count,
reverse = self.reverse,
prewrap_fn = self.prewrap_fn,
wrap = self.builder_wrapper)
return b
def listing(self):
@@ -192,24 +192,24 @@ class HotController(FixListing, ListingController):
def query(self):
#no need to worry when working from the cache
if g.use_query_cache or c.site == Default:
fix_listing = False
if c.site == Default:
self.fix_listing = False
self.builder_cls = IDBuilder
user = c.user if c.user_is_loggedin else None
sr_ids = Subreddit.user_subreddits(user)
links = normalized_hot(sr_ids)
return links
elif (not isinstance(c.site, FakeSubreddit)
#if not using the query_cache we still want cached front pages
elif (not g.use_query_cache
and not isinstance(c.site, FakeSubreddit)
and self.after is None
and self.count == 0):
self.builder_cls = IDBuilder
links = [l._fullname for l in get_hot(c.site)]
return links
else:
q = Link._query(sort = desc('_hot'), *c.site.query_rules())
q._read_cache = True
self.collect_stats = True
return q
return c.site.get_links('hot', 'all')
def content(self):
# only send an organic listing for HTML rendering
@@ -228,21 +228,6 @@ class HotController(FixListing, ListingController):
self.infotext = request.get.get('deleted') and strings.user_deleted
return ListingController.GET_listing(self, **env)
class NormalizedController(ListingController):
where = 'normalized'
builder_cls = IDBuilder
def query(self):
user = c.user if c.user_is_loggedin else None
srs = Subreddit._byID(Subreddit.user_subreddits(user),
data = True,
return_dict = False)
links = normalized_hot(srs)
return links
def title(self):
return c.site.title
class SavedController(ListingController):
prewrap_fn = lambda self, x: x._thing2
where = 'saved'
@@ -283,23 +268,15 @@ class NewController(ListingController):
return [NewMenu(default = self.sort)]
def query(self):
sort = NewMenu.operator(self.sort)
if not sort: # rising
names = get_rising(c.site)
return names
if self.sort == 'rising':
return get_rising(c.site)
else:
q = Link._query(sort = sort, read_cache = True,
*c.site.query_rules() )
self.collect_stats = True
return q
return c.site.get_links('new', 'all')
@validate(VSrMask('srs'),
sort = VMenu('controller', NewMenu))
def GET_listing(self, sort, **env):
self.sort = sort
if self.sort == 'rising':
self.builder_cls = IDBuilder
return ListingController.GET_listing(self, **env)
class BrowseController(ListingController):
@@ -310,17 +287,7 @@ class BrowseController(ListingController):
return [ControversyTimeMenu(default = self.time)]
def query(self):
q = Link._query(sort = SortMenu.operator(self.sort),
read_cache = True,
*c.site.query_rules())
if g.REDDIT_MAIN:
q._stats_collector = g.stats_collector
t = TimeMenu.operator(self.time)
if t: q._filter(t)
return q
return c.site.get_links(self.sort, self.time)
# TODO: this is a hack with sort.
@validate(VSrMask('srs'),
@@ -338,7 +305,6 @@ class BrowseController(ListingController):
class RandomrisingController(ListingController):
where = 'randomrising'
builder_cls = IDBuilder
title_text = _('you\'re really bored now, eh?')
def query(self):
@@ -347,10 +313,10 @@ class RandomrisingController(ListingController):
if not links:
# just pull from the new page if the rising page isn't
# populated for some reason
q = Link._query(limit = 200,
data = True,
sort = desc('_date'))
links = [ x._fullname for x in q ]
links = c.site.get_links('new', 'all')
if isinstance(links, Query):
links._limit = 200
links = [x._fullname for x in links]
random.shuffle(links)
@@ -358,7 +324,6 @@ class RandomrisingController(ListingController):
class RecommendedController(ListingController):
where = 'recommended'
builder_cls = IDBuilder
title_text = _('recommended for you')
@property
@@ -437,9 +402,7 @@ class MessageController(ListingController):
message = message,
success = success)
return MessagePage(content = content).render()
class RedditsController(ListingController):
render_cls = SubredditsPage

View File

@@ -0,0 +1,15 @@
from reddit_base import RedditController
from validator import *
from r2.lib.db.queries import CachedResults
import cPickle as pickle
from urllib import unquote
class QueryController(RedditController):
@validate(query = nop('query'))
def POST_doquery(self, query):
if g.enable_doquery:
cr = pickle.loads(query)
cr.update()
else:
abort(403, 'forbidden')

View File

@@ -31,7 +31,7 @@ from r2.lib.jsonresponse import json_respond
from r2.models import *
from r2.controllers.errors import errors
from r2.controllers.errors import errors, UserRequiredException
from copy import copy
from datetime import datetime, timedelta
@@ -66,14 +66,24 @@ class Validator(object):
def validate(*simple_vals, **param_vals):
def val(fn):
def newfn(self, *a, **env):
for validator in simple_vals:
validator(env)
try:
for validator in simple_vals:
validator(env)
kw = self.build_arg_list(fn, env)
for var, validator in param_vals.iteritems():
kw[var] = validator(env)
return fn(self, *a, **kw)
kw = self.build_arg_list(fn, env)
for var, validator in param_vals.iteritems():
kw[var] = validator(env)
except UserRequiredException:
d = dict(dest=reddit_link(request.path, url = True)
+ utils.query_string(request.GET))
path = "/login"
if request.environ.get('extension'):
path += ".%s" % request.environ['extension']
return redirect_to(path + utils.query_string(d))
return fn(self, *a, **kw)
return newfn
return val
@@ -303,12 +313,8 @@ class VCaptcha(Validator):
class VUser(Validator):
def run(self, password = None):
if not c.user_is_loggedin:
#TODO return a real error page
d = dict(dest=reddit_link(request.path, url = True) + utils.query_string(request.GET))
path = "/login"
if request.environ.get('extension'):
path += ".%s" % request.environ['extension']
return redirect_to(path + utils.query_string(d))
raise UserRequiredException
if (password is not None) and not valid_password(c.user, password):
c.errors.add(errors.WRONG_PASSWORD)

View File

@@ -41,18 +41,23 @@ class Globals(object):
'num_comments',
'max_comments',
'num_side_reddits',
'num_precompute_workers',
]
bool_props = ['debug', 'translator',
'template_debug',
'uncompressedJS']
'uncompressedJS',
'enable_doquery',
'use_query_cache',
'write_query_queue']
tuple_props = ['memcaches',
'rec_cache',
'admins',
'monitored_servers',
'default_srs',
'agents']
'agents',
'query_caches']
def __init__(self, global_conf, app_conf, paths, **extra):
"""
@@ -103,6 +108,7 @@ class Globals(object):
self.cache = CacheChain((LocalCache(), mc))
self.rec_cache = Memcache(self.rec_cache)
self.query_cache = Memcache(self.query_caches)
# set default time zone if one is not set
self.tz = pytz.timezone(global_conf.get('timezone'))

View File

@@ -59,6 +59,10 @@ class BaseController(WSGIController):
else:
request.ip = environ['REMOTE_ADDR']
#if x-dont-decode is set, pylons won't unicode all the paramters
if environ.get('HTTP_X_DONT_DECODE'):
request.charset = None
request.get = storify(request.GET)
request.post = storify(request.POST)
request.referer = environ.get('HTTP_REFERER')

293
r2/r2/lib/db/queries.py Normal file
View File

@@ -0,0 +1,293 @@
from r2.models import Account, Link, Comment, Vote, SaveHide
from r2.models import Message, Inbox, Subreddit
from r2.lib.db.thing import Thing
from r2.lib.db.operators import asc, desc, timeago
from r2.lib.db import query_queue
from r2.lib.db.sorts import epoch_seconds
from r2.lib.utils import fetch_things2, worker
from datetime import datetime
from pylons import g
query_cache = g.query_cache
precompute_limit = 1000
db_sorts = dict(hot = desc('_hot'),
new = desc('_date'),
top = desc('_score'),
controversial = desc('_controversy'),
old = asc('_date'))
db_times = dict(all = None,
hour = Thing.c._date >= timeago('1 hour'),
day = Thing.c._date >= timeago('1 day'),
week = Thing.c._date >= timeago('1 week'),
month = Thing.c._date >= timeago('1 month'),
year = Thing.c._date >= timeago('1 year'))
#we need to define the filter functions here so cachedresults can be pickled
def filter_identity(x):
return x
def filter_thing2(x):
"""A filter to apply to the results of a relationship query returns
the object of the relationship."""
return x._thing2
class CachedResults(object):
"""Given a query returns a list-like object that will lazily look up
the query from the persistent cache. """
def __init__(self, query, filter = filter_identity):
self.query = query
self.query._limit = precompute_limit
self.filter = filter
self.iden = self.query._iden()
self.data = []
self._fetched = False
def fetch(self):
"""Loads the query from the cache."""
if not self._fetched:
self._fetched = True
self.data = query_cache.get(self.iden) or []
return list(self)
def update(self):
"""Runs the query and stores the result in the cache. It also stores
the columns relevant to the sort to make merging with other
results faster."""
self.data = []
sort_cols = [s.col for s in self.query._sort]
for i in self.query:
item = self.filter(i)
l = [item._fullname]
for col in sort_cols:
attr = getattr(item, col)
#convert dates to epochs to take less space
if isinstance(attr, datetime):
attr = epoch_seconds(attr)
l.append(attr)
self.data.append(tuple(l))
self._fetched = True
query_cache.set(self.iden, self.data)
def __repr__(self):
return '<CachedResults %s %s>' % (self.query._rules, self.query._sort)
def __iter__(self):
if not self._fetched:
self.fetch()
for x in self.data:
yield x[0]
def merge_results(*results):
"""Given two CachedResults, mergers their lists based on the sorts of
their queries."""
if len(results) == 1:
return results[0]
#make sure the sorts match
sort = results[0].query._sort
assert(all(r.query._sort == sort for r in results[1:]))
def thing_cmp(t1, t2):
for i, s in enumerate(sort):
#t1 and t2 are tuples of (fullname, *sort_cols), so we can
#get the value to compare right out of the tuple
v1, v2 = t1[i + 1], t2[i + 1]
if v1 != v2:
return cmp(v1, v2) if isinstance(s, asc) else cmp(v2, v1)
#they're equal
return 0
all_items = []
for r in results:
r.fetch()
all_items.extend(r.data)
#all_items = Thing._by_fullname(all_items, return_dict = False)
return [i[0] for i in sorted(all_items, cmp = thing_cmp)]
def get_links(sr, sort, time):
"""General link query for a subreddit."""
q = Link._query(Link.c.sr_id == sr._id,
sort = db_sorts[sort])
if time != 'all':
q._filter(db_times[time])
return CachedResults(q)
def user_query(kind, user, sort, time):
"""General profile-page query."""
q = kind._query(kind.c.author_id == user._id,
kind.c._spam == (True, False),
sort = db_sorts[sort])
if time != 'all':
q._filter(db_times[time])
return CachedResults(q)
def get_comments(user, sort, time):
return user_query(Comment, user, sort, time)
def get_submitted(user, sort, time):
return user_query(Link, user, sort, time)
def get_overview(user, sort, time):
return merge_results(get_comments(user, sort, time),
get_submitted(user, sort, time))
def user_rel_query(rel, user, name):
"""General user relationship query."""
q = rel._query(rel.c._thing1_id == user._id,
rel.c._t2_deleted == False,
rel.c._name == name,
sort = desc('_date'),
eager_load = True)
return CachedResults(q, filter_thing2)
vote_rel = Vote.rel(Account, Link)
def get_liked(user):
return user_rel_query(vote_rel, user, '1')
def get_disliked(user):
return user_rel_query(vote_rel, user, '-1')
def get_hidden(user):
return user_rel_query(SaveHide, user, 'hide')
def get_saved(user):
return user_rel_query(SaveHide, user, 'save')
inbox_message_rel = Inbox.rel(Account, Message)
def get_inbox_messages(user):
return user_rel_query(inbox_message_rel, user, 'inbox')
inbox_comment_rel = Inbox.rel(Account, Comment)
def get_inbox_comments(user):
return user_rel_query(inbox_comment_rel, user, 'inbox')
def get_inbox(user):
return merge_results(get_inbox_comments(user),
get_inbox_messages(user))
def get_sent(user):
q = Message._query(Message.c.author_id == user._id,
Message.c._spam == (True, False),
sort = desc('_date'))
return CachedResults(q)
#can be rewritten to be more efficient
def all_queries(fn, obj, *param_lists):
"""Given a fn and a first argument 'obj', calls the fn(obj, *params)
for every permutation of the parameters in param_lists"""
results = []
params = [[obj]]
for pl in param_lists:
new_params = []
for p in pl:
for c in params:
new_param = list(c)
new_param.append(p)
new_params.append(new_param)
params = new_params
results = [fn(*p) for p in params]
return results
def display_jobs(jobs):
for r in jobs:
print r
print len(jobs)
def add_queries(queries):
"""Adds multiple queries to the query queue"""
def _add_queries():
for q in queries:
query_queue.add_query(q)
worker.do(_add_queries)
## The following functions should be called after their respective
## actions to update the correct listings.
def new_link(link):
sr = Subreddit._byID(link.sr_id)
#author = Account._byID(link.author_id)
results = all_queries(get_links, sr, ('hot', 'new', 'old'), ['all'])
results.extend(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys()))
#results.extend(all_queries(get_submitted, author,
# db_sorts.keys(),
# db_times.keys()))
add_queries(results)
def new_comment(comment):
author = Account._byID(comment.author_id)
results = all_queries(get_comments, author, db_sorts.keys(), db_times.keys())
if hasattr(comment, 'parent_id'):
parent = Comment._byID(comment.parent_id, data = True)
parent_author = Account._byID(parent.author_id)
results.append(get_inbox_comments(parent_author))
add_queries(results)
def new_vote(vote):
user = vote._thing1
item = vote._thing2
if not isinstance(item, Link):
return
sr = item.subreddit_slow
results = all_queries(get_links, sr, ('hot', 'new', 'old'), ['all'])
results.extend(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys()))
#must update both because we don't know if it's a changed vote
#results.append(get_liked(user))
#results.append(get_disliked(user))
add_queries(results)
def new_message(message):
from_user = Account._byID(message.author_id)
to_user = Account._byID(message.to_id)
results = [get_sent(from_user)]
results.append(get_inbox_messages(to_user))
add_queries(results)
def new_savehide(savehide):
user = savehide._thing1
if savehide._name == 'save':
results = [get_saved(user)]
else:
results = [get_hidden(user)]
add_queries(results)
def add_all_srs():
"""Adds every listing query for every subreddit to the queue."""
q = Subreddit._query(sort = asc('_date'))
for sr in fetch_things2(q):
add_queries(all_queries(get_links, sr, ('hot', 'new', 'old'), ['all']))
add_queries(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys()))
def add_all_users():
"""Adds every profile-page query for every user to the queue"""
q = Account._query(sort = asc('_date'))
for user in fetch_things2(q):
queries = []
queries.extend(all_queries(get_submitted, user, db_sorts.keys(), db_times.keys()))
queries.extend(all_queries(get_comments, user, db_sorts.keys(), db_times.keys()))
queries.append(get_inbox_messages(user))
queries.append(get_inbox_comments(user))
queries.append(get_saved(user))
queries.append(get_hidden(user))
queries.append(get_liked(user))
queries.append(get_disliked(user))
add_queries(queries)

116
r2/r2/lib/db/query_queue.py Normal file
View File

@@ -0,0 +1,116 @@
from __future__ import with_statement
from r2.lib.workqueue import WorkQueue
from r2.config.databases import query_queue_engine, tz
from r2.lib.db.tdb_sql import make_metadata, settings, create_table, index_str
import cPickle as pickle
from datetime import datetime
from urllib2 import Request, urlopen
from urllib import urlencode
from threading import Lock
import time
import sqlalchemy as sa
from sqlalchemy.exceptions import SQLError
from pylons import g
#the current
running = set()
running_lock = Lock()
def make_query_queue_table():
metadata = make_metadata(query_queue_engine)
table = sa.Table(settings.DB_APP_NAME + '_query_queue', metadata,
sa.Column('iden', sa.String, primary_key = True),
sa.Column('query', sa.Binary),
sa.Column('date', sa.DateTime(timezone = True)))
date_idx = index_str(table, 'date', 'date')
create_table(table, [date_idx])
return table
query_queue_table = make_query_queue_table()
def add_query(cached_results):
"""Adds a CachedResults instance to the queue db, ignoring duplicates"""
d = dict(iden = cached_results.query._iden(),
query = pickle.dumps(cached_results, -1),
date = datetime.now(tz))
try:
query_queue_table.insert().execute(d)
except SQLError, e:
#don't worry about inserting duplicates
if not 'IntegrityError' in e.message:
raise
def remove_query(iden):
"""Removes a row identified with iden from the query queue. To be
called after a CachedResults is updated."""
table = query_queue_table
d = table.delete(table.c.iden == iden)
d.execute()
def get_query():
"""Gets the next query off the queue, ignoring the currently running
queries."""
table = query_queue_table
s = table.select(order_by = sa.asc(table.c.date), limit = 1)
s.append_whereclause(sa.and_(*[table.c.iden != i for i in running]))
r = s.execute().fetchone()
if r:
return r.iden, r.query
else:
return None, None
def make_query_job(iden, pickled_cr):
"""Creates a job to send to the query worker. Updates a cached result
then removes the query from both the queue and the running set. If
sending the job fails, the query is only remove from the running
set."""
precompute_worker = g.query_queue_worker
def job():
try:
finished = False
r = Request(url = precompute_worker + '/doquery',
data = urlencode([('query', pickled_cr)]),
#this header prevents pylons from turning the
#parameter into unicode, which breaks pickling
headers = {'x-dont-decode':'true'})
urlopen(r)
finished = True
finally:
with running_lock:
running.remove(iden)
#if finished is false, we'll leave the query in the db
#so we can try again later (e.g. in the event the
#worker is down)
if finished:
remove_query(iden)
return job
def run():
"""Pull jobs from the queue, creates a job, and sends them to a
WorkQueue for processing."""
num_workers = g.num_query_queue_workers
wq = WorkQueue(num_workers = num_workers)
wq.start()
while True:
job = None
#limit the total number of jobs in the WorkQueue. we don't
#need to load the entire db queue right away (the db queue can
#get quite large).
if wq.jobs.qsize() < 2 * num_workers:
with running_lock:
iden, pickled_cr = get_query()
if pickled_cr is not None:
if not iden in running:
running.add(iden)
job = make_query_job(iden, pickled_cr)
wq.add(job)
#if we didn't find a job, sleep before trying again
if not job:
time.sleep(1)

View File

@@ -60,47 +60,47 @@ def create_table(table, index_commands=None):
for i in index_commands:
t.engine.execute(i)
def index_commands(table, type):
def index_str(name, on, where = None):
index_str = 'create index idx_%s_' % name
index_str += table.name
index_str += ' on '+ table.name + ' (%s)' % on
if where:
index_str += ' where %s' % where
return index_str
def index_str(table, name, on, where = None):
index_str = 'create index idx_%s_' % name
index_str += table.name
index_str += ' on '+ table.name + ' (%s)' % on
if where:
index_str += ' where %s' % where
return index_str
def index_commands(table, type):
commands = []
if type == 'thing':
commands.append(index_str('id', 'thing_id'))
commands.append(index_str('date', 'date'))
commands.append(index_str('deleted_spam', 'deleted, spam'))
commands.append(index_str('hot', 'hot(ups, downs, date), date'))
commands.append(index_str('score', 'score(ups, downs), date'))
commands.append(index_str('controversy', 'controversy(ups, downs), date'))
commands.append(index_str(table, 'id', 'thing_id'))
commands.append(index_str(table, 'date', 'date'))
commands.append(index_str(table, 'deleted_spam', 'deleted, spam'))
commands.append(index_str(table, 'hot', 'hot(ups, downs, date), date'))
commands.append(index_str(table, 'score', 'score(ups, downs), date'))
commands.append(index_str(table, 'controversy', 'controversy(ups, downs), date'))
elif type == 'data':
commands.append(index_str('id', 'thing_id'))
commands.append(index_str('thing_id', 'thing_id'))
commands.append(index_str('key_value', 'key, substring(value, 1, %s)' \
commands.append(index_str(table, 'id', 'thing_id'))
commands.append(index_str(table, 'thing_id', 'thing_id'))
commands.append(index_str(table, 'key_value', 'key, substring(value, 1, %s)' \
% max_val_len))
#lower name
commands.append(index_str('lower_key_value', 'key, lower(value)',
commands.append(index_str(table, 'lower_key_value', 'key, lower(value)',
where = "key = 'name'"))
#ip
commands.append(index_str('ip_network', 'ip_network(value)',
commands.append(index_str(table, 'ip_network', 'ip_network(value)',
where = "key = 'ip'"))
#base_url
commands.append(index_str('base_url', 'base_url(lower(value))',
commands.append(index_str(table, 'base_url', 'base_url(lower(value))',
where = "key = 'url'"))
elif type == 'rel':
commands.append(index_str('thing1_name_date', 'thing1_id, name, date'))
commands.append(index_str('thing2_name_date', 'thing2_id, name, date'))
commands.append(index_str('thing1_id', 'thing1_id'))
commands.append(index_str('thing2_id', 'thing2_id'))
commands.append(index_str('name', 'name'))
commands.append(index_str('date', 'date'))
commands.append(index_str(table, 'thing1_name_date', 'thing1_id, name, date'))
commands.append(index_str(table, 'thing2_name_date', 'thing2_id, name, date'))
commands.append(index_str(table, 'thing1_id', 'thing1_id'))
commands.append(index_str(table, 'thing2_id', 'thing2_id'))
commands.append(index_str(table, 'name', 'name'))
commands.append(index_str(table, 'date', 'date'))
return commands
def get_type_table(metadata):

View File

@@ -792,7 +792,7 @@ class Query(object):
lst = None
if self._read_cache:
names = cache.get(self._iden())
if names:
if names is not None:
lst = Thing._by_fullname(names, data = self._data, return_dict = False)
if lst is None:

View File

@@ -102,7 +102,7 @@ def process_new_links(period = media_period, force = False):
jobs.append(make_link_info_job(results, link, g.useragent))
#send links to a queue
wq = WorkQueue(jobs, num_workers = 20)
wq = WorkQueue(jobs, num_workers = 20, timeout = 30)
wq.start()
wq.jobs.join()

View File

@@ -24,6 +24,7 @@ from r2.lib.db.operators import desc, timeago
from r2.lib import utils
from r2.config import cache
from r2.lib.memoize import memoize
from r2.lib.db.thing import Query
from pylons import g
@@ -43,18 +44,19 @@ def top_key(sr):
return sr.name + '_top'
def expire_hot(sr):
"""Called when a subreddit should be recomputed: after a vote (hence,
submit) or deletion."""
cache.set(expire_key(sr), True)
def is_top_link(sr, link):
return cache.get(top_key(sr)) == link._fullname
#def is_top_link(sr, link):
# return cache.get(top_key(sr)) == link._fullname
def get_hot(sr):
q = Link._query(Link.c.sr_id == sr._id,
sort = desc('_hot'),
write_cache = True,
limit = 150)
iden = q._iden()
def cached_query(query, sr):
"""Returns the results from running query. The results are cached and
only recomputed after 'expire_delta'"""
query._limit = 150
query._write_cache = True
iden = query._iden()
read_cache = True
#if query is in the cache, the expire flag is true, and the access
@@ -70,25 +72,38 @@ def get_hot(sr):
else:
read_cache = False
#set access time to the last time the query was actually run (now)
if not read_cache:
cache.set(access_key(sr), datetime.now())
q._read_cache = read_cache
res = list(q)
query._read_cache = read_cache
res = list(query)
#set the #1 link so we can ignore it later. expire after TOP_CACHE
#just in case something happens and that sr doesn't update
if res:
cache.set(top_key(sr), res[0]._fullname, TOP_CACHE)
#if res:
# cache.set(top_key(sr), res[0]._fullname, TOP_CACHE)
return res
def get_hot(sr):
"""Get the hottest links for a subreddit. If g.use_query_cache is
True, it'll use the query cache, otherwise it'll use cached_query()
from above."""
q = sr.get_links('hot', 'all')
if isinstance(q, Query):
return cached_query(q, sr)
else:
return Link._by_fullname(q[:150], return_dict = False)
def only_recent(items):
return filter(lambda l: l._date > utils.timeago('%d day' % g.HOT_PAGE_AGE),
items)
@memoize('normalize_hot', time = g.page_cache_time)
def normalized_hot_cached(sr_ids):
"""Fetches the hot lists for each subreddit, normalizes the scores,
and interleaves the results."""
results = []
srs = Subreddit._byID(sr_ids, data = True, return_dict = False)
for sr in srs:

View File

@@ -21,7 +21,7 @@
################################################################################
from r2.models import *
from r2.lib.memoize import memoize
from r2.lib.normalized_hot import is_top_link, get_hot, only_recent
from r2.lib.normalized_hot import get_hot, only_recent
from r2.lib import count
import random

View File

@@ -32,39 +32,40 @@ class WorkQueue(object):
"""A WorkQueue is a queue that takes a number of functions and runs
them in parallel"""
def __init__(self, jobs, num_workers = 5, timeout = 30):
def __init__(self, jobs = [], num_workers = 5, timeout = None):
"""Creates a WorkQueue that will process jobs with num_workers
threads. If a job takes longer than timeout seconds to run, WorkQueue
won't wait for it to finish before claiming to be finished."""
self.jobs = Queue()
self.work_count = Queue(num_workers)
self.workers = {}
self.timeout = timedelta(seconds = timeout)
if timeout:
self.timeout = timedelta(seconds = timeout)
else:
self.timeout = None
for j in jobs:
self.jobs.put(j)
def monitor(self):
done = False
while not done:
if self.jobs.empty() and not self.workers:
done = True
"""The monitoring thread. Every second it checks for finished, dead,
or timed-out jobs and removes them from the queue."""
while True:
for worker, start_time in self.workers.items():
if (not worker.isAlive() or
datetime.now() - start_time > self.timeout):
self.timeout
and datetime.now() - start_time > self.timeout):
self.work_count.get_nowait()
self.jobs.task_done()
del self.workers[worker]
time.sleep(1)
def start(self):
monitor_thread = Thread(target = self.monitor)
monitor_thread.setDaemon(True)
monitor_thread.start()
while not self.jobs.empty():
def run(self):
"""The main thread for the queue. Pull a job off the job queue and
create a thread for it."""
while True:
job = self.jobs.get()
work_thread = Thread(target = job)
@@ -73,18 +74,47 @@ class WorkQueue(object):
self.workers[work_thread] = datetime.now()
work_thread.start()
if __name__ == '__main__':
def start(self):
"""Spawn a monitoring thread and the main thread for this queue. """
monitor_thread = Thread(target = self.monitor)
monitor_thread.setDaemon(True)
monitor_thread.start()
main_thread = Thread(target = self.run)
main_thread.setDaemon(True)
main_thread.start()
def add(self, job):
"""Put a new job on the queue."""
self.jobs.put(job)
def wait(self):
"""Blocks until every job that has been added to the queue is
finished."""
self.jobs.join()
def test():
def make_job(n):
import random, time
def job():
print 'starting %s' % n
blah
time.sleep(random.randint(1, 10))
print 'ending %s' % n
return job
jobs = [make_job(n) for n in xrange(10)]
wq = WorkQueue(jobs, timeout = 2)
wq = WorkQueue(jobs, timeout = 5)
wq.start()
wq.jobs.join()
wq.wait()
#wq = WorkQueue()
#wq.start()
#wq.add(make_job(10))
#print 'added job'
#wq.add(make_job(3))
#print 'added another'
#q.wait()
print 'DONE'

View File

@@ -30,6 +30,7 @@ from r2.lib.db.operators import lower, or_, and_, desc
from r2.lib.memoize import memoize, clear_memo
from r2.lib.utils import tup
from r2.lib.strings import strings, Score
import os.path
class Subreddit(Thing, Printable):
@@ -199,6 +200,14 @@ class Subreddit(Thing, Printable):
#really we mean Link.c.sr_id, but rules are type agnostic
return (self.c.sr_id == self._id,)
def get_links(self, sort, time):
from r2.lib.db import queries
q = queries.get_links(self, sort, time)
if g.use_query_cache:
return q.fetch()
else:
return q.query
@classmethod
def add_props(cls, user, wrapped):
names = ('subscriber', 'moderator', 'contributor')
@@ -345,6 +354,20 @@ class FriendsSR(FakeSubreddit):
else:
return (self.c.sr_id == self.default_srs(c.content_langs, ids = True),)
def get_links(self, sort, time):
from r2.lib.db import queries
from r2.models import Link
from r2.controllers.errors import UserRequiredException
if not c.user_is_loggedin:
raise UserRequiredException
q = Link._query(self.c.author_id == c.user.friends,
sort = queries.db_sorts[sort])
if time != 'all':
q._filter(queries.db_times[time])
return q
class AllSR(FakeSubreddit):
name = 'all'
title = 'all'
@@ -355,6 +378,15 @@ class AllSR(FakeSubreddit):
else:
return ()
def get_links(self, sort, time):
from r2.models import Link
from r2.lib.db import queries
q = Link._query(sort = queries.db_sorts[sort])
if time != 'all':
q._filter(queries.db_times[time])
return q
class DefaultSR(FakeSubreddit):
#notice the space before reddit.com
name = ' reddit.com'
@@ -366,6 +398,26 @@ class DefaultSR(FakeSubreddit):
subreddits = Subreddit.user_subreddits(user)
return (self.c.sr_id == subreddits,)
def get_links_srs(self, srs, sort, time):
from r2.lib.db import queries
from r2.models import Link
if g.use_query_cache:
results = []
for sr in srs:
results.append(queries.get_links(sr, sort, time))
return queries.merge_results(*results)
else:
q = Link._query(sort = queries.db_sorts[sort])
if time != 'all':
q._filter(queries.db_times[time])
return q
def get_links(self, sort, time):
user = c.user if c.user_is_loggedin else None
srs = Subreddit._byID(Subreddit.user_subreddits(user), return_dict = False)
return self.get_links_srs(srs, sort, time)
@property
def title(self):
return _("reddit.com: what's new online!")
@@ -394,6 +446,13 @@ class MaskedSR(DefaultSR):
subreddits.extend(self.show_sr)
return (self.c.sr_id == subreddits,)
def get_links(self, sort, time):
user = c.user if c.user_is_loggedin else None
subreddits = Subreddit.user_subreddits(user)
subreddits = [s for s in subreddits if s not in self.hide_sr]
subreddits.extend(self.show_sr)
return self.get_links_srs(subreddits, sort, time)
class SubSR(FakeSubreddit):
stylesheet = 'subreddit.css'

View File

@@ -120,6 +120,8 @@ class Vote(MultiRelation('vote',
if kind == 'link' and v.valid_thing:
expire_hot(sr)
return v
#TODO make this generic and put on multirelation?
@classmethod
def likes(cls, sub, obj):