diff --git a/r2/r2/config/databases.py b/r2/r2/config/databases.py index 729e26ecf..48f53b87a 100644 --- a/r2/r2/config/databases.py +++ b/r2/r2/config/databases.py @@ -60,6 +60,13 @@ email_engine = db_manager.get_engine(g.email_db_name, pool_size = 2, max_overflow = 2) +query_queue_engine = db_manager.get_engine(g.query_queue_db_name, + db_host = g.query_queue_db_host, + db_user = g.query_queue_db_user, + db_pass = g.query_queue_db_pass, + pool_size = 2, + max_overflow = 2) + dbm.type_db = main_engine dbm.relation_type_db = main_engine diff --git a/r2/r2/config/routing.py b/r2/r2/config/routing.py index 53de7e890..93c79bc1b 100644 --- a/r2/r2/config/routing.py +++ b/r2/r2/config/routing.py @@ -97,7 +97,7 @@ def make_map(global_conf={}, app_conf={}): mc('/', controller='hot', action='listing') - listing_controllers = "hot|saved|toplinks|new|recommended|normalized|randomrising" + listing_controllers = "hot|saved|toplinks|new|recommended|randomrising" mc('/:controller', action='listing', requirements=dict(controller=listing_controllers)) @@ -130,6 +130,8 @@ def make_map(global_conf={}, app_conf={}): mc('/captcha/:iden', controller='captcha', action='captchaimg') + mc('/doquery', controller='query', action='doquery') + mc('/store', controller='redirect', action='redirect', dest='http://store.reddit.com/index.html') @@ -138,6 +140,7 @@ def make_map(global_conf={}, app_conf={}): mc('/mobile', controller='redirect', action='redirect', dest='http://m.reddit.com/') + # This route handles displaying the error page and # graphics used in the 404/500 diff --git a/r2/r2/controllers/__init__.py b/r2/r2/controllers/__init__.py index 8ae9b83eb..2cf4e9dc8 100644 --- a/r2/r2/controllers/__init__.py +++ b/r2/r2/controllers/__init__.py @@ -21,7 +21,6 @@ ################################################################################ from listingcontroller import ListingController from listingcontroller import HotController -from listingcontroller import NormalizedController from listingcontroller import SavedController from listingcontroller import ToplinksController from listingcontroller import NewController @@ -43,6 +42,8 @@ from post import PostController from toolbar import ToolbarController from i18n import I18nController +from querycontroller import QueryController + try: from r2admin.controllers.adminapi import ApiController except ImportError: diff --git a/r2/r2/controllers/api.py b/r2/r2/controllers/api.py index 843d1f989..0611f195f 100644 --- a/r2/r2/controllers/api.py +++ b/r2/r2/controllers/api.py @@ -36,10 +36,11 @@ from r2.lib.pages import FriendList, ContributorList, ModList, \ from r2.lib.menus import CommentSortMenu from r2.lib.translation import Translator -from r2.lib.normalized_hot import expire_hot, is_top_link +from r2.lib.normalized_hot import expire_hot from r2.lib.captcha import get_iden from r2.lib import emailer from r2.lib.strings import strings +from r2.lib.db import queries from r2.config import cache from simplejson import dumps @@ -281,6 +282,10 @@ class ApiController(RedditController): if should_ratelimit: VRatelimit.ratelimit(rate_user=True, rate_ip = True) + #update the queries + if g.write_query_queue: + queries.new_link(l) + #update the modified flags set_last_modified(c.user, 'overview') set_last_modified(c.user, 'submitted') @@ -698,15 +703,21 @@ class ApiController(RedditController): else False if dir < 0 else None) organic = vote_type == 'organic' - Vote.vote(user, thing, dir, ip, spam, organic) + v = Vote.vote(user, thing, dir, ip, spam, organic) #update last modified set_last_modified(c.user, 'liked') set_last_modified(c.user, 'disliked') + #update the queries + if g.write_query_queue: + queries.new_vote(v) + # flag search indexer that something has changed tc.changed(thing) + + @Json @validate(VUser(), VModhash(), diff --git a/r2/r2/controllers/errors.py b/r2/r2/controllers/errors.py index 53b8d28ba..45ef33a78 100644 --- a/r2/r2/controllers/errors.py +++ b/r2/r2/controllers/errors.py @@ -102,3 +102,5 @@ class ErrorSet(object): def remove(self, error_name): if self.errors.has_key(error_name): del self.errors[error_name] + +class UserRequiredException(Exception): pass diff --git a/r2/r2/controllers/listingcontroller.py b/r2/r2/controllers/listingcontroller.py index c468190cb..0c2a15d49 100644 --- a/r2/r2/controllers/listingcontroller.py +++ b/r2/r2/controllers/listingcontroller.py @@ -37,7 +37,6 @@ from pylons.i18n import _ import random - class ListingController(RedditController): """Generalized controller for pages with lists of links.""" @@ -54,15 +53,13 @@ class ListingController(RedditController): # any text that should be shown on the top of the page infotext = None - # builder class to use to generate the listing - builder_cls = QueryBuilder + # builder class to use to generate the listing. if none, we'll try + # to figure it out based on the query type + builder_cls = None # page title title_text = '' - # toggles the stat collector for keeping track of what queries are being run - collect_stats = False - # login box, subreddit box, submit box, etc, visible show_sidebar = True @@ -86,10 +83,6 @@ class ListingController(RedditController): self.reverse = reverse self.query_obj = self.query() - - if self.collect_stats and g.REDDIT_MAIN: - self.query_obj._stats_collector = g.stats_collector - self.builder_obj = self.builder() self.listing_obj = self.listing() content = self.content() @@ -111,14 +104,21 @@ class ListingController(RedditController): def builder(self): #store the query itself so it can be used elsewhere - b = self.builder_cls(self.query_obj, - num = self.num, - skip = self.skip, - after = self.after, - count = self.count, - reverse = self.reverse, - prewrap_fn = self.prewrap_fn, - wrap = self.builder_wrapper) + if self.builder_cls: + builder_cls = self.builder_cls + elif isinstance(self.query_obj, Query): + builder_cls = QueryBuilder + elif isinstance(self.query_obj, list): + builder_cls = IDBuilder + + b = builder_cls(self.query_obj, + num = self.num, + skip = self.skip, + after = self.after, + count = self.count, + reverse = self.reverse, + prewrap_fn = self.prewrap_fn, + wrap = self.builder_wrapper) return b def listing(self): @@ -192,24 +192,24 @@ class HotController(FixListing, ListingController): def query(self): + #no need to worry when working from the cache + if g.use_query_cache or c.site == Default: + fix_listing = False + if c.site == Default: - self.fix_listing = False - self.builder_cls = IDBuilder user = c.user if c.user_is_loggedin else None sr_ids = Subreddit.user_subreddits(user) links = normalized_hot(sr_ids) return links - elif (not isinstance(c.site, FakeSubreddit) + #if not using the query_cache we still want cached front pages + elif (not g.use_query_cache + and not isinstance(c.site, FakeSubreddit) and self.after is None and self.count == 0): - self.builder_cls = IDBuilder links = [l._fullname for l in get_hot(c.site)] return links else: - q = Link._query(sort = desc('_hot'), *c.site.query_rules()) - q._read_cache = True - self.collect_stats = True - return q + return c.site.get_links('hot', 'all') def content(self): # only send an organic listing for HTML rendering @@ -228,21 +228,6 @@ class HotController(FixListing, ListingController): self.infotext = request.get.get('deleted') and strings.user_deleted return ListingController.GET_listing(self, **env) -class NormalizedController(ListingController): - where = 'normalized' - builder_cls = IDBuilder - - def query(self): - user = c.user if c.user_is_loggedin else None - srs = Subreddit._byID(Subreddit.user_subreddits(user), - data = True, - return_dict = False) - links = normalized_hot(srs) - return links - - def title(self): - return c.site.title - class SavedController(ListingController): prewrap_fn = lambda self, x: x._thing2 where = 'saved' @@ -283,23 +268,15 @@ class NewController(ListingController): return [NewMenu(default = self.sort)] def query(self): - sort = NewMenu.operator(self.sort) - - if not sort: # rising - names = get_rising(c.site) - return names + if self.sort == 'rising': + return get_rising(c.site) else: - q = Link._query(sort = sort, read_cache = True, - *c.site.query_rules() ) - self.collect_stats = True - return q + return c.site.get_links('new', 'all') @validate(VSrMask('srs'), sort = VMenu('controller', NewMenu)) def GET_listing(self, sort, **env): self.sort = sort - if self.sort == 'rising': - self.builder_cls = IDBuilder return ListingController.GET_listing(self, **env) class BrowseController(ListingController): @@ -310,17 +287,7 @@ class BrowseController(ListingController): return [ControversyTimeMenu(default = self.time)] def query(self): - q = Link._query(sort = SortMenu.operator(self.sort), - read_cache = True, - *c.site.query_rules()) - - if g.REDDIT_MAIN: - q._stats_collector = g.stats_collector - - t = TimeMenu.operator(self.time) - if t: q._filter(t) - - return q + return c.site.get_links(self.sort, self.time) # TODO: this is a hack with sort. @validate(VSrMask('srs'), @@ -338,7 +305,6 @@ class BrowseController(ListingController): class RandomrisingController(ListingController): where = 'randomrising' - builder_cls = IDBuilder title_text = _('you\'re really bored now, eh?') def query(self): @@ -347,10 +313,10 @@ class RandomrisingController(ListingController): if not links: # just pull from the new page if the rising page isn't # populated for some reason - q = Link._query(limit = 200, - data = True, - sort = desc('_date')) - links = [ x._fullname for x in q ] + links = c.site.get_links('new', 'all') + if isinstance(links, Query): + links._limit = 200 + links = [x._fullname for x in links] random.shuffle(links) @@ -358,7 +324,6 @@ class RandomrisingController(ListingController): class RecommendedController(ListingController): where = 'recommended' - builder_cls = IDBuilder title_text = _('recommended for you') @property @@ -437,9 +402,7 @@ class MessageController(ListingController): message = message, success = success) return MessagePage(content = content).render() - - class RedditsController(ListingController): render_cls = SubredditsPage diff --git a/r2/r2/controllers/querycontroller.py b/r2/r2/controllers/querycontroller.py new file mode 100644 index 000000000..aff0bcb8c --- /dev/null +++ b/r2/r2/controllers/querycontroller.py @@ -0,0 +1,15 @@ +from reddit_base import RedditController +from validator import * +from r2.lib.db.queries import CachedResults + +import cPickle as pickle +from urllib import unquote + +class QueryController(RedditController): + @validate(query = nop('query')) + def POST_doquery(self, query): + if g.enable_doquery: + cr = pickle.loads(query) + cr.update() + else: + abort(403, 'forbidden') diff --git a/r2/r2/controllers/validator/validator.py b/r2/r2/controllers/validator/validator.py index 291f45e9e..ba8faa7c4 100644 --- a/r2/r2/controllers/validator/validator.py +++ b/r2/r2/controllers/validator/validator.py @@ -31,7 +31,7 @@ from r2.lib.jsonresponse import json_respond from r2.models import * -from r2.controllers.errors import errors +from r2.controllers.errors import errors, UserRequiredException from copy import copy from datetime import datetime, timedelta @@ -66,14 +66,24 @@ class Validator(object): def validate(*simple_vals, **param_vals): def val(fn): def newfn(self, *a, **env): - for validator in simple_vals: - validator(env) + try: + for validator in simple_vals: + validator(env) + + kw = self.build_arg_list(fn, env) + for var, validator in param_vals.iteritems(): + kw[var] = validator(env) + + return fn(self, *a, **kw) - kw = self.build_arg_list(fn, env) - for var, validator in param_vals.iteritems(): - kw[var] = validator(env) + except UserRequiredException: + d = dict(dest=reddit_link(request.path, url = True) + + utils.query_string(request.GET)) + path = "/login" + if request.environ.get('extension'): + path += ".%s" % request.environ['extension'] + return redirect_to(path + utils.query_string(d)) - return fn(self, *a, **kw) return newfn return val @@ -303,12 +313,8 @@ class VCaptcha(Validator): class VUser(Validator): def run(self, password = None): if not c.user_is_loggedin: - #TODO return a real error page - d = dict(dest=reddit_link(request.path, url = True) + utils.query_string(request.GET)) - path = "/login" - if request.environ.get('extension'): - path += ".%s" % request.environ['extension'] - return redirect_to(path + utils.query_string(d)) + raise UserRequiredException + if (password is not None) and not valid_password(c.user, password): c.errors.add(errors.WRONG_PASSWORD) diff --git a/r2/r2/lib/app_globals.py b/r2/r2/lib/app_globals.py index 1e2a2e254..30f57b78a 100644 --- a/r2/r2/lib/app_globals.py +++ b/r2/r2/lib/app_globals.py @@ -41,18 +41,23 @@ class Globals(object): 'num_comments', 'max_comments', 'num_side_reddits', + 'num_precompute_workers', ] bool_props = ['debug', 'translator', 'template_debug', - 'uncompressedJS'] + 'uncompressedJS', + 'enable_doquery', + 'use_query_cache', + 'write_query_queue'] tuple_props = ['memcaches', 'rec_cache', 'admins', 'monitored_servers', 'default_srs', - 'agents'] + 'agents', + 'query_caches'] def __init__(self, global_conf, app_conf, paths, **extra): """ @@ -103,6 +108,7 @@ class Globals(object): self.cache = CacheChain((LocalCache(), mc)) self.rec_cache = Memcache(self.rec_cache) + self.query_cache = Memcache(self.query_caches) # set default time zone if one is not set self.tz = pytz.timezone(global_conf.get('timezone')) diff --git a/r2/r2/lib/base.py b/r2/r2/lib/base.py index f4fd44eb5..6f940c5d3 100644 --- a/r2/r2/lib/base.py +++ b/r2/r2/lib/base.py @@ -59,6 +59,10 @@ class BaseController(WSGIController): else: request.ip = environ['REMOTE_ADDR'] + #if x-dont-decode is set, pylons won't unicode all the paramters + if environ.get('HTTP_X_DONT_DECODE'): + request.charset = None + request.get = storify(request.GET) request.post = storify(request.POST) request.referer = environ.get('HTTP_REFERER') diff --git a/r2/r2/lib/db/queries.py b/r2/r2/lib/db/queries.py new file mode 100644 index 000000000..948158d27 --- /dev/null +++ b/r2/r2/lib/db/queries.py @@ -0,0 +1,293 @@ +from r2.models import Account, Link, Comment, Vote, SaveHide +from r2.models import Message, Inbox, Subreddit +from r2.lib.db.thing import Thing +from r2.lib.db.operators import asc, desc, timeago +from r2.lib.db import query_queue +from r2.lib.db.sorts import epoch_seconds +from r2.lib.utils import fetch_things2, worker + +from datetime import datetime + +from pylons import g +query_cache = g.query_cache + +precompute_limit = 1000 +db_sorts = dict(hot = desc('_hot'), + new = desc('_date'), + top = desc('_score'), + controversial = desc('_controversy'), + old = asc('_date')) + +db_times = dict(all = None, + hour = Thing.c._date >= timeago('1 hour'), + day = Thing.c._date >= timeago('1 day'), + week = Thing.c._date >= timeago('1 week'), + month = Thing.c._date >= timeago('1 month'), + year = Thing.c._date >= timeago('1 year')) + +#we need to define the filter functions here so cachedresults can be pickled +def filter_identity(x): + return x + +def filter_thing2(x): + """A filter to apply to the results of a relationship query returns + the object of the relationship.""" + return x._thing2 + +class CachedResults(object): + """Given a query returns a list-like object that will lazily look up + the query from the persistent cache. """ + def __init__(self, query, filter = filter_identity): + self.query = query + self.query._limit = precompute_limit + self.filter = filter + self.iden = self.query._iden() + self.data = [] + self._fetched = False + + def fetch(self): + """Loads the query from the cache.""" + if not self._fetched: + self._fetched = True + self.data = query_cache.get(self.iden) or [] + return list(self) + + def update(self): + """Runs the query and stores the result in the cache. It also stores + the columns relevant to the sort to make merging with other + results faster.""" + self.data = [] + sort_cols = [s.col for s in self.query._sort] + for i in self.query: + item = self.filter(i) + l = [item._fullname] + for col in sort_cols: + attr = getattr(item, col) + #convert dates to epochs to take less space + if isinstance(attr, datetime): + attr = epoch_seconds(attr) + l.append(attr) + + self.data.append(tuple(l)) + + self._fetched = True + query_cache.set(self.iden, self.data) + + def __repr__(self): + return '' % (self.query._rules, self.query._sort) + + def __iter__(self): + if not self._fetched: + self.fetch() + + for x in self.data: + yield x[0] + +def merge_results(*results): + """Given two CachedResults, mergers their lists based on the sorts of + their queries.""" + if len(results) == 1: + return results[0] + + #make sure the sorts match + sort = results[0].query._sort + assert(all(r.query._sort == sort for r in results[1:])) + + def thing_cmp(t1, t2): + for i, s in enumerate(sort): + #t1 and t2 are tuples of (fullname, *sort_cols), so we can + #get the value to compare right out of the tuple + v1, v2 = t1[i + 1], t2[i + 1] + if v1 != v2: + return cmp(v1, v2) if isinstance(s, asc) else cmp(v2, v1) + #they're equal + return 0 + + all_items = [] + for r in results: + r.fetch() + all_items.extend(r.data) + + #all_items = Thing._by_fullname(all_items, return_dict = False) + return [i[0] for i in sorted(all_items, cmp = thing_cmp)] + +def get_links(sr, sort, time): + """General link query for a subreddit.""" + q = Link._query(Link.c.sr_id == sr._id, + sort = db_sorts[sort]) + if time != 'all': + q._filter(db_times[time]) + return CachedResults(q) + +def user_query(kind, user, sort, time): + """General profile-page query.""" + q = kind._query(kind.c.author_id == user._id, + kind.c._spam == (True, False), + sort = db_sorts[sort]) + if time != 'all': + q._filter(db_times[time]) + return CachedResults(q) + +def get_comments(user, sort, time): + return user_query(Comment, user, sort, time) + +def get_submitted(user, sort, time): + return user_query(Link, user, sort, time) + +def get_overview(user, sort, time): + return merge_results(get_comments(user, sort, time), + get_submitted(user, sort, time)) + +def user_rel_query(rel, user, name): + """General user relationship query.""" + q = rel._query(rel.c._thing1_id == user._id, + rel.c._t2_deleted == False, + rel.c._name == name, + sort = desc('_date'), + eager_load = True) + + return CachedResults(q, filter_thing2) + +vote_rel = Vote.rel(Account, Link) + +def get_liked(user): + return user_rel_query(vote_rel, user, '1') + +def get_disliked(user): + return user_rel_query(vote_rel, user, '-1') + +def get_hidden(user): + return user_rel_query(SaveHide, user, 'hide') + +def get_saved(user): + return user_rel_query(SaveHide, user, 'save') + +inbox_message_rel = Inbox.rel(Account, Message) +def get_inbox_messages(user): + return user_rel_query(inbox_message_rel, user, 'inbox') + +inbox_comment_rel = Inbox.rel(Account, Comment) +def get_inbox_comments(user): + return user_rel_query(inbox_comment_rel, user, 'inbox') + +def get_inbox(user): + return merge_results(get_inbox_comments(user), + get_inbox_messages(user)) + +def get_sent(user): + q = Message._query(Message.c.author_id == user._id, + Message.c._spam == (True, False), + sort = desc('_date')) + return CachedResults(q) + +#can be rewritten to be more efficient +def all_queries(fn, obj, *param_lists): + """Given a fn and a first argument 'obj', calls the fn(obj, *params) + for every permutation of the parameters in param_lists""" + results = [] + params = [[obj]] + for pl in param_lists: + new_params = [] + for p in pl: + for c in params: + new_param = list(c) + new_param.append(p) + new_params.append(new_param) + params = new_params + + results = [fn(*p) for p in params] + return results + +def display_jobs(jobs): + for r in jobs: + print r + print len(jobs) + +def add_queries(queries): + """Adds multiple queries to the query queue""" + def _add_queries(): + for q in queries: + query_queue.add_query(q) + worker.do(_add_queries) + +## The following functions should be called after their respective +## actions to update the correct listings. +def new_link(link): + sr = Subreddit._byID(link.sr_id) + #author = Account._byID(link.author_id) + + results = all_queries(get_links, sr, ('hot', 'new', 'old'), ['all']) + results.extend(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys())) + #results.extend(all_queries(get_submitted, author, + # db_sorts.keys(), + # db_times.keys())) + + add_queries(results) + +def new_comment(comment): + author = Account._byID(comment.author_id) + results = all_queries(get_comments, author, db_sorts.keys(), db_times.keys()) + + if hasattr(comment, 'parent_id'): + parent = Comment._byID(comment.parent_id, data = True) + parent_author = Account._byID(parent.author_id) + results.append(get_inbox_comments(parent_author)) + + add_queries(results) + +def new_vote(vote): + user = vote._thing1 + item = vote._thing2 + + if not isinstance(item, Link): + return + + sr = item.subreddit_slow + results = all_queries(get_links, sr, ('hot', 'new', 'old'), ['all']) + results.extend(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys())) + + #must update both because we don't know if it's a changed vote + #results.append(get_liked(user)) + #results.append(get_disliked(user)) + + add_queries(results) + +def new_message(message): + from_user = Account._byID(message.author_id) + to_user = Account._byID(message.to_id) + + results = [get_sent(from_user)] + results.append(get_inbox_messages(to_user)) + + add_queries(results) + +def new_savehide(savehide): + user = savehide._thing1 + if savehide._name == 'save': + results = [get_saved(user)] + else: + results = [get_hidden(user)] + + add_queries(results) + +def add_all_srs(): + """Adds every listing query for every subreddit to the queue.""" + q = Subreddit._query(sort = asc('_date')) + for sr in fetch_things2(q): + add_queries(all_queries(get_links, sr, ('hot', 'new', 'old'), ['all'])) + add_queries(all_queries(get_links, sr, ('top', 'controversial'), db_times.keys())) + +def add_all_users(): + """Adds every profile-page query for every user to the queue""" + q = Account._query(sort = asc('_date')) + for user in fetch_things2(q): + queries = [] + queries.extend(all_queries(get_submitted, user, db_sorts.keys(), db_times.keys())) + queries.extend(all_queries(get_comments, user, db_sorts.keys(), db_times.keys())) + queries.append(get_inbox_messages(user)) + queries.append(get_inbox_comments(user)) + queries.append(get_saved(user)) + queries.append(get_hidden(user)) + queries.append(get_liked(user)) + queries.append(get_disliked(user)) + add_queries(queries) diff --git a/r2/r2/lib/db/query_queue.py b/r2/r2/lib/db/query_queue.py new file mode 100644 index 000000000..9d53b7d2b --- /dev/null +++ b/r2/r2/lib/db/query_queue.py @@ -0,0 +1,116 @@ +from __future__ import with_statement +from r2.lib.workqueue import WorkQueue +from r2.config.databases import query_queue_engine, tz +from r2.lib.db.tdb_sql import make_metadata, settings, create_table, index_str + +import cPickle as pickle +from datetime import datetime +from urllib2 import Request, urlopen +from urllib import urlencode +from threading import Lock +import time + +import sqlalchemy as sa +from sqlalchemy.exceptions import SQLError + +from pylons import g + +#the current +running = set() +running_lock = Lock() + +def make_query_queue_table(): + metadata = make_metadata(query_queue_engine) + table = sa.Table(settings.DB_APP_NAME + '_query_queue', metadata, + sa.Column('iden', sa.String, primary_key = True), + sa.Column('query', sa.Binary), + sa.Column('date', sa.DateTime(timezone = True))) + date_idx = index_str(table, 'date', 'date') + create_table(table, [date_idx]) + return table + +query_queue_table = make_query_queue_table() + +def add_query(cached_results): + """Adds a CachedResults instance to the queue db, ignoring duplicates""" + d = dict(iden = cached_results.query._iden(), + query = pickle.dumps(cached_results, -1), + date = datetime.now(tz)) + try: + query_queue_table.insert().execute(d) + except SQLError, e: + #don't worry about inserting duplicates + if not 'IntegrityError' in e.message: + raise + +def remove_query(iden): + """Removes a row identified with iden from the query queue. To be + called after a CachedResults is updated.""" + table = query_queue_table + d = table.delete(table.c.iden == iden) + d.execute() + +def get_query(): + """Gets the next query off the queue, ignoring the currently running + queries.""" + table = query_queue_table + + s = table.select(order_by = sa.asc(table.c.date), limit = 1) + s.append_whereclause(sa.and_(*[table.c.iden != i for i in running])) + r = s.execute().fetchone() + + if r: + return r.iden, r.query + else: + return None, None + +def make_query_job(iden, pickled_cr): + """Creates a job to send to the query worker. Updates a cached result + then removes the query from both the queue and the running set. If + sending the job fails, the query is only remove from the running + set.""" + precompute_worker = g.query_queue_worker + def job(): + try: + finished = False + r = Request(url = precompute_worker + '/doquery', + data = urlencode([('query', pickled_cr)]), + #this header prevents pylons from turning the + #parameter into unicode, which breaks pickling + headers = {'x-dont-decode':'true'}) + urlopen(r) + finished = True + finally: + with running_lock: + running.remove(iden) + #if finished is false, we'll leave the query in the db + #so we can try again later (e.g. in the event the + #worker is down) + if finished: + remove_query(iden) + return job + +def run(): + """Pull jobs from the queue, creates a job, and sends them to a + WorkQueue for processing.""" + num_workers = g.num_query_queue_workers + wq = WorkQueue(num_workers = num_workers) + wq.start() + + while True: + job = None + #limit the total number of jobs in the WorkQueue. we don't + #need to load the entire db queue right away (the db queue can + #get quite large). + if wq.jobs.qsize() < 2 * num_workers: + with running_lock: + iden, pickled_cr = get_query() + if pickled_cr is not None: + if not iden in running: + running.add(iden) + job = make_query_job(iden, pickled_cr) + wq.add(job) + + #if we didn't find a job, sleep before trying again + if not job: + time.sleep(1) diff --git a/r2/r2/lib/db/tdb_sql.py b/r2/r2/lib/db/tdb_sql.py index 89b29037a..3c8a96245 100644 --- a/r2/r2/lib/db/tdb_sql.py +++ b/r2/r2/lib/db/tdb_sql.py @@ -60,47 +60,47 @@ def create_table(table, index_commands=None): for i in index_commands: t.engine.execute(i) -def index_commands(table, type): - def index_str(name, on, where = None): - index_str = 'create index idx_%s_' % name - index_str += table.name - index_str += ' on '+ table.name + ' (%s)' % on - if where: - index_str += ' where %s' % where - return index_str - +def index_str(table, name, on, where = None): + index_str = 'create index idx_%s_' % name + index_str += table.name + index_str += ' on '+ table.name + ' (%s)' % on + if where: + index_str += ' where %s' % where + return index_str + +def index_commands(table, type): commands = [] if type == 'thing': - commands.append(index_str('id', 'thing_id')) - commands.append(index_str('date', 'date')) - commands.append(index_str('deleted_spam', 'deleted, spam')) - commands.append(index_str('hot', 'hot(ups, downs, date), date')) - commands.append(index_str('score', 'score(ups, downs), date')) - commands.append(index_str('controversy', 'controversy(ups, downs), date')) + commands.append(index_str(table, 'id', 'thing_id')) + commands.append(index_str(table, 'date', 'date')) + commands.append(index_str(table, 'deleted_spam', 'deleted, spam')) + commands.append(index_str(table, 'hot', 'hot(ups, downs, date), date')) + commands.append(index_str(table, 'score', 'score(ups, downs), date')) + commands.append(index_str(table, 'controversy', 'controversy(ups, downs), date')) elif type == 'data': - commands.append(index_str('id', 'thing_id')) - commands.append(index_str('thing_id', 'thing_id')) - commands.append(index_str('key_value', 'key, substring(value, 1, %s)' \ + commands.append(index_str(table, 'id', 'thing_id')) + commands.append(index_str(table, 'thing_id', 'thing_id')) + commands.append(index_str(table, 'key_value', 'key, substring(value, 1, %s)' \ % max_val_len)) #lower name - commands.append(index_str('lower_key_value', 'key, lower(value)', + commands.append(index_str(table, 'lower_key_value', 'key, lower(value)', where = "key = 'name'")) #ip - commands.append(index_str('ip_network', 'ip_network(value)', + commands.append(index_str(table, 'ip_network', 'ip_network(value)', where = "key = 'ip'")) #base_url - commands.append(index_str('base_url', 'base_url(lower(value))', + commands.append(index_str(table, 'base_url', 'base_url(lower(value))', where = "key = 'url'")) elif type == 'rel': - commands.append(index_str('thing1_name_date', 'thing1_id, name, date')) - commands.append(index_str('thing2_name_date', 'thing2_id, name, date')) - commands.append(index_str('thing1_id', 'thing1_id')) - commands.append(index_str('thing2_id', 'thing2_id')) - commands.append(index_str('name', 'name')) - commands.append(index_str('date', 'date')) + commands.append(index_str(table, 'thing1_name_date', 'thing1_id, name, date')) + commands.append(index_str(table, 'thing2_name_date', 'thing2_id, name, date')) + commands.append(index_str(table, 'thing1_id', 'thing1_id')) + commands.append(index_str(table, 'thing2_id', 'thing2_id')) + commands.append(index_str(table, 'name', 'name')) + commands.append(index_str(table, 'date', 'date')) return commands def get_type_table(metadata): diff --git a/r2/r2/lib/db/thing.py b/r2/r2/lib/db/thing.py index d93f61174..e35007c41 100644 --- a/r2/r2/lib/db/thing.py +++ b/r2/r2/lib/db/thing.py @@ -792,7 +792,7 @@ class Query(object): lst = None if self._read_cache: names = cache.get(self._iden()) - if names: + if names is not None: lst = Thing._by_fullname(names, data = self._data, return_dict = False) if lst is None: diff --git a/r2/r2/lib/media.py b/r2/r2/lib/media.py index 29c77268d..297bdea12 100644 --- a/r2/r2/lib/media.py +++ b/r2/r2/lib/media.py @@ -102,7 +102,7 @@ def process_new_links(period = media_period, force = False): jobs.append(make_link_info_job(results, link, g.useragent)) #send links to a queue - wq = WorkQueue(jobs, num_workers = 20) + wq = WorkQueue(jobs, num_workers = 20, timeout = 30) wq.start() wq.jobs.join() diff --git a/r2/r2/lib/normalized_hot.py b/r2/r2/lib/normalized_hot.py index 460e78872..bf419bfb0 100644 --- a/r2/r2/lib/normalized_hot.py +++ b/r2/r2/lib/normalized_hot.py @@ -24,6 +24,7 @@ from r2.lib.db.operators import desc, timeago from r2.lib import utils from r2.config import cache from r2.lib.memoize import memoize +from r2.lib.db.thing import Query from pylons import g @@ -43,18 +44,19 @@ def top_key(sr): return sr.name + '_top' def expire_hot(sr): + """Called when a subreddit should be recomputed: after a vote (hence, + submit) or deletion.""" cache.set(expire_key(sr), True) -def is_top_link(sr, link): - return cache.get(top_key(sr)) == link._fullname +#def is_top_link(sr, link): +# return cache.get(top_key(sr)) == link._fullname -def get_hot(sr): - q = Link._query(Link.c.sr_id == sr._id, - sort = desc('_hot'), - write_cache = True, - limit = 150) - - iden = q._iden() +def cached_query(query, sr): + """Returns the results from running query. The results are cached and + only recomputed after 'expire_delta'""" + query._limit = 150 + query._write_cache = True + iden = query._iden() read_cache = True #if query is in the cache, the expire flag is true, and the access @@ -70,25 +72,38 @@ def get_hot(sr): else: read_cache = False + #set access time to the last time the query was actually run (now) if not read_cache: cache.set(access_key(sr), datetime.now()) - - q._read_cache = read_cache - res = list(q) + + query._read_cache = read_cache + res = list(query) #set the #1 link so we can ignore it later. expire after TOP_CACHE #just in case something happens and that sr doesn't update - if res: - cache.set(top_key(sr), res[0]._fullname, TOP_CACHE) + #if res: + # cache.set(top_key(sr), res[0]._fullname, TOP_CACHE) return res +def get_hot(sr): + """Get the hottest links for a subreddit. If g.use_query_cache is + True, it'll use the query cache, otherwise it'll use cached_query() + from above.""" + q = sr.get_links('hot', 'all') + if isinstance(q, Query): + return cached_query(q, sr) + else: + return Link._by_fullname(q[:150], return_dict = False) + def only_recent(items): return filter(lambda l: l._date > utils.timeago('%d day' % g.HOT_PAGE_AGE), items) @memoize('normalize_hot', time = g.page_cache_time) def normalized_hot_cached(sr_ids): + """Fetches the hot lists for each subreddit, normalizes the scores, + and interleaves the results.""" results = [] srs = Subreddit._byID(sr_ids, data = True, return_dict = False) for sr in srs: diff --git a/r2/r2/lib/organic.py b/r2/r2/lib/organic.py index ccf4a96ed..079040431 100644 --- a/r2/r2/lib/organic.py +++ b/r2/r2/lib/organic.py @@ -21,7 +21,7 @@ ################################################################################ from r2.models import * from r2.lib.memoize import memoize -from r2.lib.normalized_hot import is_top_link, get_hot, only_recent +from r2.lib.normalized_hot import get_hot, only_recent from r2.lib import count import random diff --git a/r2/r2/lib/workqueue.py b/r2/r2/lib/workqueue.py index 62716889a..3c46a932d 100644 --- a/r2/r2/lib/workqueue.py +++ b/r2/r2/lib/workqueue.py @@ -32,39 +32,40 @@ class WorkQueue(object): """A WorkQueue is a queue that takes a number of functions and runs them in parallel""" - def __init__(self, jobs, num_workers = 5, timeout = 30): + def __init__(self, jobs = [], num_workers = 5, timeout = None): """Creates a WorkQueue that will process jobs with num_workers threads. If a job takes longer than timeout seconds to run, WorkQueue won't wait for it to finish before claiming to be finished.""" self.jobs = Queue() self.work_count = Queue(num_workers) self.workers = {} - self.timeout = timedelta(seconds = timeout) + if timeout: + self.timeout = timedelta(seconds = timeout) + else: + self.timeout = None for j in jobs: self.jobs.put(j) def monitor(self): - done = False - while not done: - if self.jobs.empty() and not self.workers: - done = True - + """The monitoring thread. Every second it checks for finished, dead, + or timed-out jobs and removes them from the queue.""" + while True: for worker, start_time in self.workers.items(): if (not worker.isAlive() or - datetime.now() - start_time > self.timeout): + self.timeout + and datetime.now() - start_time > self.timeout): + self.work_count.get_nowait() self.jobs.task_done() del self.workers[worker] time.sleep(1) - def start(self): - monitor_thread = Thread(target = self.monitor) - monitor_thread.setDaemon(True) - monitor_thread.start() - - while not self.jobs.empty(): + def run(self): + """The main thread for the queue. Pull a job off the job queue and + create a thread for it.""" + while True: job = self.jobs.get() work_thread = Thread(target = job) @@ -73,18 +74,47 @@ class WorkQueue(object): self.workers[work_thread] = datetime.now() work_thread.start() -if __name__ == '__main__': + def start(self): + """Spawn a monitoring thread and the main thread for this queue. """ + monitor_thread = Thread(target = self.monitor) + monitor_thread.setDaemon(True) + monitor_thread.start() + + main_thread = Thread(target = self.run) + main_thread.setDaemon(True) + main_thread.start() + + def add(self, job): + """Put a new job on the queue.""" + self.jobs.put(job) + + def wait(self): + """Blocks until every job that has been added to the queue is + finished.""" + self.jobs.join() + +def test(): def make_job(n): import random, time def job(): print 'starting %s' % n + blah time.sleep(random.randint(1, 10)) print 'ending %s' % n return job jobs = [make_job(n) for n in xrange(10)] - wq = WorkQueue(jobs, timeout = 2) + wq = WorkQueue(jobs, timeout = 5) wq.start() - wq.jobs.join() + wq.wait() + + #wq = WorkQueue() + #wq.start() + #wq.add(make_job(10)) + #print 'added job' + #wq.add(make_job(3)) + #print 'added another' + #q.wait() + print 'DONE' diff --git a/r2/r2/models/subreddit.py b/r2/r2/models/subreddit.py index c2f329325..faebbaafd 100644 --- a/r2/r2/models/subreddit.py +++ b/r2/r2/models/subreddit.py @@ -30,6 +30,7 @@ from r2.lib.db.operators import lower, or_, and_, desc from r2.lib.memoize import memoize, clear_memo from r2.lib.utils import tup from r2.lib.strings import strings, Score + import os.path class Subreddit(Thing, Printable): @@ -199,6 +200,14 @@ class Subreddit(Thing, Printable): #really we mean Link.c.sr_id, but rules are type agnostic return (self.c.sr_id == self._id,) + def get_links(self, sort, time): + from r2.lib.db import queries + q = queries.get_links(self, sort, time) + if g.use_query_cache: + return q.fetch() + else: + return q.query + @classmethod def add_props(cls, user, wrapped): names = ('subscriber', 'moderator', 'contributor') @@ -345,6 +354,20 @@ class FriendsSR(FakeSubreddit): else: return (self.c.sr_id == self.default_srs(c.content_langs, ids = True),) + def get_links(self, sort, time): + from r2.lib.db import queries + from r2.models import Link + from r2.controllers.errors import UserRequiredException + + if not c.user_is_loggedin: + raise UserRequiredException + + q = Link._query(self.c.author_id == c.user.friends, + sort = queries.db_sorts[sort]) + if time != 'all': + q._filter(queries.db_times[time]) + return q + class AllSR(FakeSubreddit): name = 'all' title = 'all' @@ -355,6 +378,15 @@ class AllSR(FakeSubreddit): else: return () + def get_links(self, sort, time): + from r2.models import Link + from r2.lib.db import queries + q = Link._query(sort = queries.db_sorts[sort]) + if time != 'all': + q._filter(queries.db_times[time]) + return q + + class DefaultSR(FakeSubreddit): #notice the space before reddit.com name = ' reddit.com' @@ -366,6 +398,26 @@ class DefaultSR(FakeSubreddit): subreddits = Subreddit.user_subreddits(user) return (self.c.sr_id == subreddits,) + def get_links_srs(self, srs, sort, time): + from r2.lib.db import queries + from r2.models import Link + if g.use_query_cache: + results = [] + for sr in srs: + results.append(queries.get_links(sr, sort, time)) + + return queries.merge_results(*results) + else: + q = Link._query(sort = queries.db_sorts[sort]) + if time != 'all': + q._filter(queries.db_times[time]) + return q + + def get_links(self, sort, time): + user = c.user if c.user_is_loggedin else None + srs = Subreddit._byID(Subreddit.user_subreddits(user), return_dict = False) + return self.get_links_srs(srs, sort, time) + @property def title(self): return _("reddit.com: what's new online!") @@ -394,6 +446,13 @@ class MaskedSR(DefaultSR): subreddits.extend(self.show_sr) return (self.c.sr_id == subreddits,) + def get_links(self, sort, time): + user = c.user if c.user_is_loggedin else None + subreddits = Subreddit.user_subreddits(user) + subreddits = [s for s in subreddits if s not in self.hide_sr] + subreddits.extend(self.show_sr) + return self.get_links_srs(subreddits, sort, time) + class SubSR(FakeSubreddit): stylesheet = 'subreddit.css' diff --git a/r2/r2/models/vote.py b/r2/r2/models/vote.py index 1d9233f38..5b7360f69 100644 --- a/r2/r2/models/vote.py +++ b/r2/r2/models/vote.py @@ -120,6 +120,8 @@ class Vote(MultiRelation('vote', if kind == 'link' and v.valid_thing: expire_hot(sr) + return v + #TODO make this generic and put on multirelation? @classmethod def likes(cls, sub, obj):