From 16d0b1c480cd9cf594deb28909dc26f91ae9436a Mon Sep 17 00:00:00 2001 From: Neil Williams Date: Fri, 19 Oct 2012 13:37:26 -0700 Subject: [PATCH] query_cache: Add support for precomputed queries. --- r2/r2/models/query_cache.py | 81 ++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 6 deletions(-) diff --git a/r2/r2/models/query_cache.py b/r2/r2/models/query_cache.py index c21ea8b18..0f4eeeb8c 100644 --- a/r2/r2/models/query_cache.py +++ b/r2/r2/models/query_cache.py @@ -43,7 +43,7 @@ from pycassa.batch import Mutator from r2.models import Thing from r2.lib.db import tdb_cassandra -from r2.lib.db.operators import asc +from r2.lib.db.operators import asc, desc, BooleanOp from r2.lib.db.sorts import epoch_seconds from r2.lib.utils import flatten, to36 @@ -129,11 +129,12 @@ class CachedQuery(_CachedQueryBase): """ - def __init__(self, model, key, sort, filter_fn): + def __init__(self, model, key, sort, filter_fn, is_precomputed): self.model = model self.key = key self.filter = filter_fn self.timestamps = None # column timestamps, for safe pruning + self.is_precomputed = is_precomputed super(CachedQuery, self).__init__(sort) def _make_item_tuple(self, item): @@ -159,13 +160,28 @@ class CachedQuery(_CachedQueryBase): @classmethod def _fetch_multi(self, queries): + """Fetch the unsorted query results for multiple queries at once. + + In the case of precomputed queries, do an extra lookup first to + determine which row key to find the latest precomputed values for the + query in. + + """ + by_model = collections.defaultdict(list) for q in queries: by_model[q.model].append(q) cached_queries = {} for model, queries in by_model.iteritems(): - fetched = model.get([q.key for q in queries]) + pure, need_mangling = [], [] + for q in queries: + if not q.is_precomputed: + pure.append(q.key) + else: + need_mangling.append(q.key) + mangled = model.index_mangle_keys(need_mangling) + fetched = model.get(pure + mangled) cached_queries.update(fetched) for q in queries: @@ -291,6 +307,7 @@ class CachedQueryMutator(object): LOG.debug("Inserting %r into query %r", things, query) + assert not query.is_precomputed query._insert(self.mutator, things) if (random.random() / len(things)) < PRUNE_CHANCE: @@ -339,6 +356,33 @@ def filter_thing2(x): return x._thing2 +def _is_query_precomputed(query): + """Return if this query must be updated offline in a batch job. + + Simple queries can be modified in place in the query cache, but ones + with more complicated eligibility criteria, such as a time limit ("top + this month") cannot be modified this way and must instead be + recalculated periodically. Rather than replacing a single row + repeatedly, the precomputer stores in a new row every time it runs and + updates an index of the latest run. + + """ + + # visit all the nodes in the rule tree to see if there are time limitations + # if we find one, this query is one that must be precomputed + rules = list(query._rules) + while rules: + rule = rules.pop() + + if isinstance(rule, BooleanOp): + rules.extend(rule.ops) + continue + + if rule.lval.name == "_date": + return True + return False + + def cached_query(model, filter_fn=filter_identity, sort=None): """Decorate a function describing a cached query. @@ -375,10 +419,19 @@ def cached_query(model, filter_fn=filter_identity, sort=None): row_key = '.'.join(row_key_components) query = fn(*args) - query_sort = query._sort if query else sort - assert query_sort - return CachedQuery(model, row_key, query_sort, filter_fn) + if query: + # sql-backed query + query_sort = query._sort + is_precomputed = _is_query_precomputed(query) + else: + # pure-cassandra query + assert sort + query_sort = sort + is_precomputed = False + + return CachedQuery(model, row_key, query_sort, filter_fn, + is_precomputed) return cached_query_wrapper return cached_query_decorator @@ -443,6 +496,22 @@ class _BaseQueryCache(object): return res + @classmethod + def index_mangle_keys(cls, keys): + if not keys: + return [] + + index_keys = ["/".join((key, "index")) for key in keys] + rows = cls._cf.multiget(index_keys, + column_reversed=True, + column_count=1) + + res = [] + for key, columns in rows.iteritems(): + index_component = columns.keys()[0] + res.append("/".join((key, index_component))) + return res + @classmethod @tdb_cassandra.will_write def insert(cls, mutator, key, columns):